{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Cheat Sheet (pyspark)\n",
    "https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Big Data\n",
    "\n",
    "#### The three Vs\n",
    "\n",
    "##### Volume¶\n",
    "Volume refers to the amount of data generated through websites, portals and online applications in a data-driven business. Especially for online retailers, volume encompasses the available data that are out there and need to be assessed for relevance.\n",
    "\n",
    "##### Velocity¶\n",
    "Velocity refers to the speed with which data is generated, and as internet speeds have increased and the number of users has increased, the velocity has also increased substantially.\n",
    "\n",
    "##### Variety¶\n",
    "Variety in Big Data refers to all the structured and unstructured data that has the possibility of getting generated either by humans or by machines. Structured data is whatever data you could store in a spreadsheet. It can easily be cataloged and summary statistics can be calculated for it. Unstructured data are raw things like texts, tweets, pictures, videos, emails, voice mails, hand-written text, ECG reading, and audio recordings. Humans can only make sense of data that is structured, and it is usually up to data scientists to create some organization and structure to unstructured data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Big Data Concepts\n",
    "- clustered computing - collection of resources of multiple machines\n",
    "- parallel computing - simultaneous computation\n",
    "- distributed computing - collection of nodes that run in parallel\n",
    "- batch processing - breaking job into smaller pieces and running on individual machines\n",
    "- real-time processing - immediate processing of data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark features\n",
    "- distributed cluster computing framework\n",
    "- efficient in-memory computations for large datasets\n",
    "- lightning fast data processing framework\n",
    "- provides support for Java, Scala, Python, R and SQL\n",
    "- local mode - single machine\n",
    "    - debugging, demo, testing\n",
    "- cluster mode - set of pre-defined machine\n",
    "- no code change is necessary from local -> cluster mode\n",
    "\n",
    "\n",
    "### Spark components\n",
    "- Apache Spark Core (RDD API)\n",
    "    - Spark SQL (structured data processing)\n",
    "    - Mllib ML (machine learning)\n",
    "    - GraphX (DAGs and other graphs)\n",
    "    - Spark Streaming (real-time processing)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parallel and Distributed Computing with Map-Reduce\n",
    "\n",
    "MapReduce is a programming paradigm that enables the ability to scale across hundreds or thousands of servers for big data analytics. The underlying concept can be somewhat difficult to grasp, because this paradigm differs from the traditional programming practices.\n",
    "\n",
    "*In a nutshell, the term \"MapReduce\" refers to two distinct tasks. The first is the __Map__ job, which takes one set of data and transforms it into another set of data, where individual elements are broken down into tuples __(key/value pairs)__, while the __Reduce__ job takes the output from a map as input and combines those data tuples into a smaller set of tuples.*\n",
    "\n",
    "#### Distributed Processing Systems\n",
    ">A distributed processing system is a group of computers in a network working in tandem to accomplish a task\n",
    "\n",
    "#### Parallel Processing\n",
    "With parallel computing:\n",
    "\n",
    "* a larger problem is broken up into smaller pieces\n",
    "* every part of the problem follows a series of instructions\n",
    "* each one of the instructions is executed simultaneously on different processors\n",
    "* all of the answers are collected from the small problems and combined into one final answer\n",
    "\n",
    "#### MapReduce process\n",
    "\n",
    "##### 1. MAP Task ((Splitting & Mapping)\n",
    "The dataset that needs processing must first be transformed into <key:value> pairs and split into fragments, which are then assigned to map tasks. Each computing cluster is assigned a number of map tasks, which are subsequently distributed among its nodes. In this example, let's assume that we are using 5 nodes (a server with 5 different worker.\n",
    "\n",
    "First, split the data from one file or files into however many nodes are being used.\n",
    "\n",
    "We will then use the map function to create key value pairs represented by:   \n",
    "*{animal}* , *{# of animals per zoo}* \n",
    "\n",
    "After processing of the original key:value pairs, some __intermediate__ key:value pairs are generated. The intermediate key:value pairs are __sorted by their key values__ to create a new list of key:value pairs.\n",
    "\n",
    "##### 2. Shuffling\n",
    "This list from the map task is divided into a new set of fragments that sorts and shuffles the mapped objects into an order or grouping that will make it easier to reduce them. __The number these new fragments, will be the same as the number of the reduce tasks__. \n",
    "\n",
    "##### 3. REDUCE Task (Reducing)\n",
    "Now, every properly shuffled segment will have a reduce task applied to it. After the task is completed, the final output is written onto a file system. The underlying file system is usually HDFS (Hadoop Distributed File System). \n",
    "\n",
    "It's important to note that MapReduce will generally only be powerful when dealing with large amounts of data. When using on a small dataset, it will be faster to perform operations not in the MapReduce framework.\n",
    "\n",
    "There are two groups of entities in this process to ensuring that the map reduce task gets done properly:\n",
    "\n",
    "__Job Tracker__: a \"master\" node that informs the other nodes which map and reduce jobs to complete\n",
    "\n",
    "__Task Tracker__: the \"worker\" nodes that complete the map and reduce operations\n",
    "\n",
    "There are different names for these components depending on the technology used, but there will always be a master node that informs worker nodes what tasks to perform."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### PySpark\n",
    "- Spark is written in Scala, so pyspark was created\n",
    "\n",
    "#### shell\n",
    "- spark shell for scala\n",
    "- pyspark\n",
    "- sparkR"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark UI\n",
    "- Spark Task - unit of exec. that runs on 1 cpu\n",
    "- Spark Stage - group of tasks that perform the same computation in parallel, each task running on a different subset of the data\n",
    "- Spark Job - computation triggered by action, sliced into stages\n",
    "- uses port 4040, 4041, 4042, 4043\n",
    "\n",
    "#### Tabs\n",
    "\n",
    "##### Jobs\n",
    "- shows Spark Jobs\n",
    "##### Stages\n",
    "- shows the stages and details involved in a job (shown in reverse order)\n",
    "##### Storage\n",
    "- inspect caches\n",
    "##### Environment\n",
    "##### Executors\n",
    "##### SQL\n",
    "- shows details of a SQL query"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Caching Data\n",
    "- keeps data in memory\n",
    "- caching is lazy\n",
    "- cache selectively\n",
    "\n",
    "        df.cache() - caches a dataframe\n",
    "        df.unpersist() - uncaches\n",
    "        df.is_cached - checks if df is cached\n",
    "        df.storageLevel - provides details of how df is cached\n",
    "            - useDisk\n",
    "            - useMemory\n",
    "            - useOffHeap #offheap storage\n",
    "            - deserialized \n",
    "            - replication\n",
    "        spark.catalog.isCached(tablename='df') - lets you know if a df is cached\n",
    "        spark.catalog.cacheTable('df') - caches a table\n",
    "        spark.catalog.uncache('df') - uncaches table\n",
    "        spark.catalog.clearCache() - clears all cache"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Logging\n",
    "- for production\n",
    "\n",
    "        import logging\n",
    "        logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(asctime)s-%(levelname)s - %(message)s')\n",
    "        # Log columns of text_df as debug message\n",
    "        logging.debug(\"text_df columns: %s\", text_df.columns)\n",
    "\n",
    "        # Log whether table1 is cached as info message\n",
    "        logging.info(\"table1 is cached: %s\", spark.catalog.isCached(tableName=\"table1\"))\n",
    "\n",
    "        # Log first row of text_df as warning message\n",
    "        logging.warning(\"The first row of text_df:\\n %s\", text_df.first())\n",
    "\n",
    "        # Log selected columns of text_df as error message\n",
    "        logging.error(\"Selected columns: %s\", text_df.select(\"id\", \"word\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark Context\n",
    "- entry point into the spark cluster\n",
    "#### SparkConf\n",
    "- configures a Spark Context including Java properties\n",
    "\n",
    "        pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)\n",
    "\n",
    "#### Create a local spark context with pyspark\n",
    "\n",
    "    import pyspark\n",
    "    sc = pyspark.SparkContext('local[*]')\n",
    "\n",
    "#### Display the type of the Spark Context\n",
    "    \n",
    "    type(sc)\n",
    "\n",
    "#### Use Python's dir(obj) to get a list of all attributes of SparkContext\n",
    "    \n",
    "    dir(sc)\n",
    "\n",
    "#### Use Python's help ( help(object) ) function to get information on attributes and methods for sc object. \n",
    "    \n",
    "    help(sc)\n",
    "\n",
    "#### Check the number of cores being used\n",
    "    \n",
    "    sc.defaultParallelism \n",
    "\n",
    "#### Check for the current version of Spark\n",
    "     \n",
    "     sc.version\n",
    "    \n",
    "#### Check the name of application currently running in spark environment\n",
    "    \n",
    "    sc.appName\n",
    "    \n",
    "#### Access complete configuration settings (including all defaults) for the current spark context \n",
    "    \n",
    "    sc._conf.getAll()\n",
    "    \n",
    "#### Shut down SparkContext\n",
    "    \n",
    "    sc.stop()\n",
    "\n",
    "#### show python version\n",
    "\n",
    "    sc.pythonVer\n",
    "    \n",
    "#### show master location of sparkcontext\n",
    "\n",
    "    sc.master"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Spark DataFrame\n",
    "- The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDD\n",
    "- To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.\n",
    "\n",
    "        # Import SparkSession from pyspark.sql\n",
    "        from pyspark.sql import SparkSession\n",
    "\n",
    "        # Create my_spark\n",
    "        my_spark = SparkSession.builder.getOrCreate()\n",
    "\n",
    "        # Print my_spark\n",
    "        print(my_spark)\n",
    "        \n",
    "#### catalog\n",
    "- lists all data inside your cluster\n",
    "\n",
    "        print(spark.catalog.listTables())\n",
    "        \n",
    "####  SQL queries\n",
    "- SQL query format\n",
    "\n",
    "        # create a SQL query\n",
    "        query = \"FROM flights SELECT * LIMIT 10\"\n",
    "        # Get the first 10 rows of flights\n",
    "        flights10 = spark.sql(query)\n",
    "        # Show the results\n",
    "        flights10.show()\n",
    "        \n",
    "#### Spark Cluster to Pandas DataFrame\n",
    "- convert with the .toPandas() method\n",
    "\n",
    "        # create a query\n",
    "        query = \"SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest\"\n",
    "        # Run the query\n",
    "        flight_counts = spark.sql(query)\n",
    "        # Convert the results to a pandas DataFrame\n",
    "        pd_counts = flight_counts.toPandas()\n",
    "        # Print the head of pd_counts\n",
    "        print(pd_counts.head())\n",
    "        \n",
    "#### DataFrame to Spark Cluster\n",
    "- the .createDataFrame() method will convert a DF to a Spark Cluster\n",
    "        \n",
    "        spark_temp = spark.createDataFrame('file_path', schema=schema)\n",
    "        \n",
    "#### Temporary View\n",
    "- will allow spark to create a temporary cluster to use spark commands on a DataFrame\n",
    "  \n",
    "\n",
    "        spark_temp.createOrReplaceTempView('new_table_name')\n",
    "        \n",
    "#### Direct read-in of datasources\n",
    "- allows direct conversion to a spark dataframe for several file types\n",
    "\n",
    "        #  file path to .csv\n",
    "        file_path = \"/usr/local/share/datasets/airports.csv\"\n",
    "\n",
    "        # Read in the airports data\n",
    "        airports = spark.read.csv(file_path, header=True, inferSchema=True)\n",
    "\n",
    "        # Show the data\n",
    "        airports.show()\n",
    "        \n",
    "#### creating columns\n",
    "- In Spark you can do this using the .withColumn() method, which takes two arguments. First, a string with the name of your new column, and second the new column input\n",
    "- Spark dataframes are immutable, so adding new columns means reassigning the df.\n",
    "\n",
    "        df = df.withColumn(\"newCol\", df.column.mean()) #using the mean of another column to create values for new column \n",
    "        \n",
    "#### filtering results\n",
    "- similar to the WHERE clause in SQL\n",
    "- two ways of filtering:\n",
    "    - passing a string value will filter out the values listed\n",
    "    - passing in the direct df.column with the filter will create a new column will boolean values.\n",
    "    \n",
    "            # Filter flights by passing a string\n",
    "            long_flights1 = flights.filter(\"distance > 1000\")\n",
    "\n",
    "            # Filter flights by passing a column of boolean values\n",
    "            long_flights2 = flights.filter(flights.distance > 1000)\n",
    "            \n",
    "#### select columns\n",
    "- similar to SQL SELECT statement\n",
    "- takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it\n",
    "- can also use column-wise operations \n",
    "\n",
    "        # Select columns using column strings\n",
    "        selected1 = flights.select('tailnum', 'origin', 'dest')\n",
    "\n",
    "        # Select columns using df.column format\n",
    "        temp = flights.select(flights.origin, flights.dest, flights.carrier)\n",
    "        #using operations to create a column\n",
    "        avg_speed = (flights.distance/(flights.air_time/60))\n",
    "        \n",
    "#### alias\n",
    "- renames a column when selecting\n",
    "\n",
    "        #rename column to avg_speed\n",
    "        avg_speed = (flights.distance/(flights.air_time/60)).alias(\"avg_speed\")\n",
    "        \n",
    "#### aggregating\n",
    "- you can aggregate with common methods by using the .groupBy() method\n",
    "- creating groups using the .groupby() allows them to be part of a pyspark.sql.GroupedData class\n",
    "        \n",
    "        #using filter and groupby to fingd the shortest flight from PDX\n",
    "        flights.filter(flights.origin == 'PDX').groupBy().min(\"distance\").show()\n",
    "- you can also use the .agg() function which allows use of any of the pyspark.sql.functions library\n",
    "\n",
    "        #import functions\n",
    "        import pyspark.sql.functions as F\n",
    "        #find standard deviation of a columns\n",
    "        df.column.agg(F.stddev()).show()\n",
    "        \n",
    "#### joining\n",
    "- performed using the df.join() method\n",
    "    \n",
    "        joined_df = df.join(joining_df, on='joining_column', how='how_to_join')\n",
    "        \n",
    "#### dropDuplicates()\n",
    "- drops duplicate rows in DataFrame\n",
    "\n",
    "#### count()\n",
    "- counts the rows in a DataFrame\n",
    "\n",
    "#### printSchema()\n",
    "- prints the schema of the DataFrame\n",
    "\n",
    "#### hasattr(x, 'toArray')\n",
    "\n",
    "- determines the vactor type (dense, sparse)\n",
    "    - dense array: [1.0,0.0,0.0,3.0]\n",
    "    - sparse array: (4, [0,3], [1.0,3.0])\n",
    "    - (size of array, [# of non-zero indices in array], [values of items in array]\n",
    "    \n",
    "#### x.numNonzeros()\n",
    "- determines if an array is empty"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### User Defined Functions Spark SQL\n",
    "- Extract\n",
    "- Transform\n",
    "- Select\n",
    "#### UDF\n",
    "- User Defined Function\n",
    "\n",
    "        from pyspark.sql.functions import udf\n",
    "        \n",
    "#### UDF Return Type\n",
    "- data type to be returned by UDF\n",
    "\n",
    "        from pyspark.sql.types import BooleanType, StringType, IntegerType, FloatType, ArrayType\n",
    "        # example\n",
    "        short_udf = udf(lambda x: True if not x or len(x) < 10 else False, BooleanType())\n",
    "        \n",
    "#### Example \n",
    "\n",
    "    # Returns true if the value is a nonempty vector\n",
    "    nonempty_udf = udf(lambda x:  \n",
    "        True if (x and hasattr(x, \"toArray\") and x.numNonzeros())\n",
    "        else False, BooleanType())\n",
    "\n",
    "    # Returns first element of the array as string\n",
    "    s_udf = udf(lambda x: str(x[0]) if (x and type(x) is list and len(x) > 0)\n",
    "        else '', StringType())\n",
    "     # Show the rows where doc contains the item '5'\n",
    "    df_before.where(array_contains('doc', '5')).show()\n",
    "\n",
    "    # UDF removes items in TRIVIAL_TOKENS from array\n",
    "    rm_trivial_udf = udf(lambda x:\n",
    "                             list(set(x) - TRIVIAL_TOKENS) if x\n",
    "                             else x,\n",
    "                             ArrayType(StringType()))\n",
    "\n",
    "    # Remove trivial tokens from 'in' and 'out' columns of df2\n",
    "    df_after = df_before.withColumn('in', rm_trivial_udf('in'))\\\n",
    "                        .withColumn('out', rm_trivial_udf('out'))\n",
    "\n",
    "    # Show the rows of df_after where doc contains the item '5'\n",
    "    df_after.where(array_contains('doc','5')).show() "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Loading Natural Language Using Spark SQL dot notation\n",
    "\n",
    "    # Load the dataframe\n",
    "    df = spark.read.load('sherlock_sentences.parquet')\n",
    "\n",
    "    # Filter and show the first 5 rows\n",
    "    df.where('id > 70').show(5, truncate=False)\n",
    "    # Split the clause column into a column called words \n",
    "    split_df = clauses_df.select(split('clause', ' ').alias('words'))\n",
    "    split_df.show(5, truncate=False)\n",
    "\n",
    "    # Explode the words column into a column called word \n",
    "    exploded_df = split_df.select(explode('words').alias('word'))\n",
    "    exploded_df.show(10)\n",
    "\n",
    "    # Count the resulting number of rows in exploded_df\n",
    "    print(\"\\nNumber of rows: \", exploded_df.count())\n",
    "    \n",
    "### Moving Windows Using SQL statements\n",
    "\n",
    "    # Word for each row, previous two and subsequent two words\n",
    "    query = \"\"\"\n",
    "    SELECT\n",
    "    part,\n",
    "    LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,\n",
    "    LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,\n",
    "    word AS w3,\n",
    "    LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,\n",
    "    LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5\n",
    "    FROM text\n",
    "    \"\"\"\n",
    "    spark.sql(query).where(\"part = 12\").show(10)\n",
    "    # Repartition text_df into 12 partitions on 'chapter' column\n",
    "    repart_df = text_df.repartition(12, 'chapter')\n",
    "\n",
    "    # Prove that repart_df has 12 partitions\n",
    "    repart_df.rdd.getNumPartitions()\n",
    "    \n",
    "### Finding Word Sequences Using Spark SQL Queries\n",
    "\n",
    "#### Find the top 10 sequences of five words\n",
    "\n",
    "    query = \"\"\"\n",
    "    SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (\n",
    "       SELECT word AS w1,\n",
    "       LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,\n",
    "       LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,\n",
    "       LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,\n",
    "       LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5\n",
    "       FROM text\n",
    "    )\n",
    "    GROUP BY w1, w2, w3, w4, w5\n",
    "    ORDER BY count DESC\n",
    "    LIMIT 10 \"\"\"\n",
    "    df = spark.sql(query)\n",
    "    df.show()\n",
    "    \n",
    "#### Unique 5-tuples sorted in descending order\n",
    "\n",
    "    spark.sql(\"\"\"\n",
    "    SELECT DISTINCT w1, w2, w3, w4, w5 FROM (\n",
    "       SELECT word AS w1,\n",
    "       LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2,\n",
    "       LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3,\n",
    "       LEAD(word,3) OVER(PARTITION BY part ORDER BY id ) AS w4,\n",
    "       LEAD(word,4) OVER(PARTITION BY part ORDER BY id ) AS w5\n",
    "       FROM text\n",
    "    )\n",
    "    ORDER BY w1 DESC, w2 DESC, w3 DESC, w4 DESC, w5 DESC \n",
    "    LIMIT 10\n",
    "    \"\"\").show()\n",
    "    \n",
    "####   Most frequent 3-tuple per chapter\n",
    "\n",
    "    query = \"\"\"\n",
    "    SELECT chapter, w1, w2, w3, count FROM\n",
    "    (\n",
    "      SELECT\n",
    "      chapter,\n",
    "      ROW_NUMBER() OVER (PARTITION BY chapter ORDER BY COUNT DESC) AS row,\n",
    "      w1, w2, w3, count\n",
    "      FROM ( %s )\n",
    "    )\n",
    "    WHERE row = 1\n",
    "    ORDER BY chapter ASC\n",
    "    \"\"\" % subquery\n",
    "\n",
    "    spark.sql(query).show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Text Classification Using Spark SQL\n",
    "\n",
    "    # Selects the first element of a vector column\n",
    "    first_udf = udf(lambda x:\n",
    "                float(x.indices[0]) \n",
    "                if (x and hasattr(x, \"toArray\") and x.numNonzeros())\n",
    "                else 0.0,\n",
    "                FloatType())\n",
    "\n",
    "    # Apply first_udf to the output column\n",
    "    df.select(first_udf(\"output\").alias(\"result\")).show(5)\n",
    "    # Add label by applying the get_first_udf to output column\n",
    "    df_new = df.withColumn('label', get_first_udf('output'))\n",
    "\n",
    "    # Show the first five rows \n",
    "    df_new.show(5)\n",
    "    # Transform df using model\n",
    "    result = model.transform(df.withColumnRenamed('in', 'words'))\\\n",
    "            .withColumnRenamed('words', 'in')\\\n",
    "            .withColumnRenamed('vec', 'invec')\n",
    "    result.drop('sentence').show(3, False)\n",
    "\n",
    "    # Add a column based on the out column called outvec\n",
    "    result = model.transform(result.withColumnRenamed('out', 'words'))\\\n",
    "            .withColumnRenamed('words', 'out')\\\n",
    "            .withColumnRenamed('vec', 'outvec')\n",
    "    result.select('invec', 'outvec').show(3,False)\n",
    "    # Import the lit function\n",
    "    from pyspark.sql.functions import lit\n",
    "\n",
    "    # Select the rows where endword is 'him' and label 1\n",
    "    df_pos = df.where(\"endword = 'him'\")\\\n",
    "               .withColumn('label', lit(1))\n",
    "\n",
    "    # Select the rows where endword is not 'him' and label 0\n",
    "    df_neg = df.where(\"endword <> 'him'\")\\\n",
    "               .withColumn('label', lit(0))\n",
    "\n",
    "    # Union pos and neg in equal number\n",
    "    df_examples = df_pos.union(df_neg.limit(df_pos.count()))\n",
    "    print(\"Number of examples: \", df_examples.count())\n",
    "    df_examples.where(\"endword <> 'him'\").sample(False, .1, 42).show(5)\n",
    "    # Split the examples into train and test, use 80/20 split\n",
    "    df_trainset, df_testset = df_examples.randomSplit((.8,.2), 42)\n",
    "\n",
    "    # Print the number of training examples\n",
    "    print(\"Number training: \", df_trainset.count())\n",
    "\n",
    "    # Print the number of test examples\n",
    "    print(\"Number test: \", df_testset.count())\n",
    "    # Import the logistic regression classifier\n",
    "    from pyspark.ml.classification import LogisticRegression\n",
    "\n",
    "    # Instantiate logistic setting elasticnet to 0.0\n",
    "    logistic = LogisticRegression(maxIter=100, regParam=0.4, elasticNetParam=0.0)\n",
    "\n",
    "    # Train the logistic classifer on the trainset\n",
    "    df_fitted = logistic.fit(df_trainset)\n",
    "\n",
    "    # Print the number of training iterations\n",
    "    print(\"Training iterations: \", df_fitted.summary.totalIterations)\n",
    "    # Score the model on test data\n",
    "    testSummary = df_fitted.evaluate(df_testset)\n",
    "\n",
    "    # Print the AUC metric\n",
    "    print(\"\\ntest AUC: %.3f\" % testSummary.areaUnderROC)\n",
    "    # Apply the model to the test data\n",
    "    predictions = df_fitted.transform(df_testset).select(fields)\n",
    "\n",
    "    # Print incorrect if prediction does not match label\n",
    "    for x in predictions.take(8):\n",
    "        print()\n",
    "        if x.label != int(x.prediction):\n",
    "            print(\"INCORRECT ==> \")\n",
    "        for y in fields:\n",
    "            print(y,\":\", x[y])\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data Visualization using DataFrames\n",
    "####  pyspark_dist_explore library\n",
    "- quick insights\n",
    "\n",
    "##### hist()\n",
    "\n",
    "    #load data\n",
    "    test_df = spark.read.csv('test_csv')\n",
    "    #select data to be visualized\n",
    "    test_df_col = test_df.select('Col_name')\n",
    "    #create histogram\n",
    "    hist(test_df_col, bins=10, color='red')\n",
    "\n",
    "##### distplot()\n",
    "    #load data\n",
    "    test_df = spark.read.csv('test_csv')\n",
    "    #select data to be visualized\n",
    "    test_df_col = test_df.select('Col_name')\n",
    "    #create histogram\n",
    "    distplot(test_df_col, bins=10, color='red')\n",
    "\n",
    "##### pandas_histogram()\n",
    "\n",
    "    #load data\n",
    "    test_df = spark.read.csv('test_csv')\n",
    "    #select data to be visualized\n",
    "    test_df_col = test_df.select('Col_name')\n",
    "    #create histogram\n",
    "    pandas_histogram(test_df_col, bins=10, color='red')\n",
    "    \n",
    "#### toPandas()\n",
    "- use toPandas method to convert to pandas DataFrame\n",
    "- possible to use built in visual pandas methods\n",
    "\n",
    "#### HandySpark library\n",
    "- new package\n",
    "- histogram example shown\n",
    "\n",
    "    \n",
    "        #load data\n",
    "        test_df = spark.read.csv('test_csv')\n",
    "        #convert to Handy\n",
    "        test_df_col = test_df.toHandy()\n",
    "        #create histogram\n",
    "        test_df_col.cols['col_name'].hist()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Resilient Distributed Datasets (RDDs)\n",
    "\n",
    "Resilient Distributed Datasets (RDD) are fundamental data structures of Spark. An RDD is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. An RDD could come from any datasource, e.g. text files, a database, a JSON file etc.\n",
    "- partitions - logical division of a large distributed dataset\n",
    "- performs two types of operations:\n",
    "    - transformations - create new RDDs\n",
    "    - actions perform computation on RDDs\n",
    "\n",
    "#### load data\n",
    "    rdd = sc.parallelize(data,numSlices=10) #creates 10 partitions\n",
    "    print(type(rdd))\n",
    "    #load text data\n",
    "    rdd2 = sc.textFile('file_name.file', minPartitions = 5)#defines a minimum no. of partitions\n",
    "    \n",
    "#### Get # of partitions\n",
    "    rdd.getNumPartitions()\n",
    "    \n",
    "### Actions\n",
    "    rdd.count() #returns the total count of items in the RDD\n",
    "    rdd.first() #returns the first item in the RDD\n",
    "    rdd.take(number) #returns the first n items in the RDD\n",
    "    rdd.top(number) #returns the top n items\n",
    "    rdd.collect() #returns everything from your RDD\n",
    "    reduce(function) #aggregates elements of a RDD\n",
    "    saveAsTextFile('file_name') #saves RDD as text file with each partition as a separate file\n",
    "    coalesce() #used with saveAsTextFile will save to a single text file\n",
    "    \n",
    "####  Use a reduce method with a lambda function to to add up all of the values in the RDD\n",
    "\n",
    "    selected_items.reduce(lambda x,y :x + y)\n",
    "    \n",
    "    \n",
    "####  sortBy method on the RDD to rank the users from highest spending to least spending.\n",
    "\n",
    "    total_spent.sortBy(lambda x: x[1],ascending = False)\n",
    "    \n",
    "### Transformations\n",
    "    \n",
    "#### Applying lambda function to data\n",
    "\n",
    "    discounted = revenue_minus_tax.map(lambda x : x*0.9)\n",
    "    \n",
    "#### chain methods in spark\n",
    "\n",
    "    price_items.map(sales_tax).map(lambda x : x*0.9).top(15)\n",
    "    \n",
    "#### See the full lineage of all the operations that have been performed on an RDD\n",
    "    discounted.toDebugString()\n",
    "    \n",
    "#### Flatmap (creates a list of data - all same level)\n",
    "\n",
    "    flat_mapped = price_items.flatMap(lambda x : (x, x*0.92*0.9 ))\n",
    "\n",
    "#### A filter method is a specialized form of a map function that only returns the items that match a certain criteria\n",
    "\n",
    "    selected_items = discounted.filter(lambda x: x>300)\n",
    "    \n",
    "#### union() returns the union of one RDD to another RDD\n",
    "\n",
    "    combinedRDD = RDD1.union(RDD2)\n",
    "    \n",
    "### Pair RDDS\n",
    "- special data structure to work with key: value pairs\n",
    "- Key is identifier and Value is data\n",
    "- all regular transformations work\n",
    "- have to pass functions that operate on key:value pairs\n",
    "\n",
    "#### create pair RDD\n",
    "\n",
    "    my_tuple =[('Sam':, 23), ('Mary': 34)]\n",
    "    pairRDD = sc.parallelize(my_tuple)\n",
    "    \n",
    "#### reduceByKey to perform reducing operations while grouping by keys.\n",
    "\n",
    "    total_spent = sales_data.reduceByKey(lambda x,y :x + y)\n",
    "    \n",
    "#### groupByKey groups values with the same key\n",
    "\n",
    "    groupByKey(RDD to group)\n",
    "\n",
    "#### sortByKey sorts values by key\n",
    "\n",
    "    sortByKey(ascending=True)\n",
    "\n",
    "#### join() joins two pair RDDs based on their key\n",
    "    \n",
    "    RDD1.join(RDD2)\n",
    "    \n",
    "#### countByKey() counts number of elements for each key\n",
    "\n",
    "    for key, val in rdd.countByKey().items():\n",
    "    print(key, value)\n",
    "\n",
    "#### collectAsMap() returns key:value pairs in the RDD as a dictionary\n",
    "\n",
    "    sc.parallelize([(1,2),(3,4)]).collectAsMap()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Machine Learning in Spark\n",
    "\n",
    "[mllib](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html)\n",
    "- mllib library is built upon the RDDs \n",
    "[ml](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html)\n",
    "- ml library is built on Spark DataFrames\n",
    "\n",
    "\n",
    "    \n",
    "#### ml library\n",
    "\n",
    "- used sklearn as an inspiration for their implementation of a machine learning library. As a result, many of the methods and functionalities look similar, but there are some crucial distinctions. There two main classes\n",
    "\n",
    "`Transformer`\n",
    "- An algorithm that transforms one pyspark DataFrame into another DataFrame.\n",
    "- uses the .transform() method\n",
    "- returns a new dataframe, usually with a new column appended\n",
    "- examples of classes: Bucketizer, PCA\n",
    "\n",
    "`Estimator`\n",
    "- An algorithm that can be fit onto a pyspark DataFrame that can then be used as a Transformer.\n",
    "- uses the .fit() method\n",
    "- returns a model object\n",
    "- examples of calsses: StringIndexerColum, RandomForestModel\n",
    "\n",
    "#### ml library machine learning steps\n",
    "\n",
    "##### ensure proper data types\n",
    "- only takes numbers (decimals, whole numbers) called 'doubles','integers' in Spark\n",
    "- Spark will make a guess on datatypes, if not recognized as a double or integer, you can use the .cast() method in combination of .withColumn() method to convert the datatype (\"integer\", \"double\")\n",
    "\n",
    "        #create new column with numerical data\n",
    "        original_df = original_df.withColumn(\"col_name\", df.col_name.cast('datatype')\n",
    "##### handle categorical variables\n",
    "- there are two steps to take to one-hot-encode categorical variables:\n",
    "- step 1: use StringIndexer to convert distinct variables to individual numbered columns\n",
    "\n",
    "        #create stringindexer\n",
    "        indexer = StringIndexer(inputCol='col_name', outputCol='new_cols_name')\n",
    "- step 2: use OneHotEncoder to OHE the columns\n",
    "        \n",
    "        #create ohe\n",
    "        encoder = OneHotEncoder(inputCol='col_name', outputCol='new_cols_name'\n",
    "##### assemble a vector\n",
    "- Spark modeling requires all columns containing features to be combined in one column\n",
    "- done using VectorAssembler\n",
    "\n",
    "        #assemble matrix into a vector\n",
    "        assembler = VectorAssembler(inputCols=['list', 'of', 'col', 'names'], outputCol='feature')\n",
    "##### create a pipeline\n",
    "- combines all estimators and transformers\n",
    "- wraps the process so you can reuse the named pipeline\n",
    "\n",
    "        #import pipeline\n",
    "        from pyspark.ml import Pipeline\n",
    "        #make pipeline\n",
    "        pipeline = Pipeline(stages=[indexer, encoder, assembler])\n",
    "##### fit and transform data\n",
    "\n",
    "        #fit and transform\n",
    "        piped_data = pipeline.fit(model_data).transform(model_data)\n",
    "##### split data into train and test sets\n",
    "\n",
    "        #split data\n",
    "        training, test = piped_data.randomSplit([.7,.3])\n",
    "##### run selected model\n",
    "- there are many models to choose from, ensure proper package is imported and instantiated\n",
    "##### select evaluation method\n",
    "- select evaluation via pyspark.ml.evaluation\n",
    "        \n",
    "        evaluator = selectedEvaluationMethod()\n",
    "##### hyperparameter tuning\n",
    "- performed using the pyspark.ml.tuning library\n",
    "- if several parameters will be tuned, use ParamGridBuilder() method\n",
    "\n",
    "        #import tuning package\n",
    "        import pyspark.ml.tuning as tune\n",
    "        #create grid\n",
    "        grid = ParamGridBuilder()\n",
    "        #replace the grid with all additions\n",
    "        grid = grid.addGrid(model.parameter, [0,1,2])\n",
    "        grid = grid.addGrid(model.parameter, [0,1,2])\n",
    "        #build the grid\n",
    "        grid = grid.build()\n",
    "##### create the validator        \n",
    "- the tuning module also contains the CrossValidator class which will implement your selected evaluator\n",
    "\n",
    "        #create crossvalidator\n",
    "        cv = tune.CrossValidator(estimator=model_name,\n",
    "                                 estimatorParamMaps=grid,\n",
    "                                 evaluator=evaluator)\n",
    "##### fit training data\n",
    "    best_model = model_name.fit(training)\n",
    "##### predict test data\n",
    "    results = best_model.transform(test)\n",
    "##### evaluate model\n",
    "    print(evaluator.evaluate(test_results))\n",
    "\n",
    "#### Sample ml library Pipeline\n",
    "\n",
    "    from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator\n",
    "    from pyspark.ml.evaluation import RegressionEvaluator\n",
    "\n",
    "    #set pipeline parameters\n",
    "    string_indexer = StringIndexer(inputCol='month',outputCol='month_num',handleInvalid='keep')\n",
    "    one_hot_encoder = OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'])\n",
    "    vector_assember = VectorAssembler(inputCols=features,outputCol='features')\n",
    "    random_forest = RandomForestRegressor(featuresCol='features',labelCol='area')\n",
    "    stages =  [string_indexer, one_hot_encoder, vector_assember,random_forest]\n",
    "\n",
    "    pipeline = Pipeline(stages=stages) #instantiate pipeline\n",
    "\n",
    "    params = ParamGridBuilder()\\\n",
    "    .addGrid(random_forest.maxDepth, [5,10,15])\\\n",
    "    .addGrid(random_forest.numTrees, [20,50,100])\\\n",
    "    .build() #performs gridsearch on set parameters\n",
    "\n",
    "    reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area',metricName = 'mae') #evaluates model\n",
    "\n",
    "    cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params,evaluator=reg_evaluator)\n",
    "\n",
    "    cross_validated_model = cv.fit(spark_df) #fits model\n",
    "\n",
    "    cross_validated_model.avgMetrics #returns best metrics based on metricName\n",
    "\n",
    "    #shows selected predictions\n",
    "    predictions = cross_validated_model.transform(spark_df)\n",
    "    predictions.select('prediction','area').show(300)\n",
    "\n",
    "    cross_validated_model.bestModel.stages #checking best model by stage\n",
    "\n",
    "    optimal_rf_model = cross_validated_model.bestModel.stages[3] #looking at stage 3 of process\n",
    "    optimal_rf_model.fe\n",
    "\n",
    "    optimal_rf_model.featureImportances #checking feature importance\n",
    "    \n",
    "#### MLlib library\n",
    "- designed for parallel processing on a cluster\n",
    "- supports scala, R, Java, Python\n",
    "- good for large datasets\n",
    "\n",
    "##### sample imports\n",
    "\n",
    "    #alternative least squares\n",
    "    from pyspark mllib.recommendation import ALS\n",
    "    #logistic regression with LBFGS\n",
    "    from pyspark mllib.classification import LogisticRegressionWithLBFGS\n",
    "    #K means\n",
    "    from pyspark.mllib.clustering import KMeans\n",
    "    \n",
    "### Reccomendation System using MLlib\n",
    "\n",
    "    # Load the data into RDD\n",
    "    data = sc.textFile(file_path)\n",
    "\n",
    "    # Split the RDD \n",
    "    ratings = data.map(lambda l: l.split(','))\n",
    "\n",
    "    # Transform the ratings RDD\n",
    "    ratings_final = ratings.map(lambda line: Rating(int(line[0]), int(line[1]), float(line[2])))\n",
    "\n",
    "    # Split the data into training and test\n",
    "    training_data, test_data = ratings_final.randomSplit([0.8, 0.2])\n",
    "    # Create the ALS model on the training data\n",
    "    model = ALS.train(training_data, rank=10, iterations=10)\n",
    "\n",
    "    # Drop the ratings column \n",
    "    testdata_no_rating = test_data.map(lambda p: (p[0], p[1]))\n",
    "\n",
    "    # Predict the model  \n",
    "    predictions = model.predictAll(testdata_no_rating)\n",
    "    # Prepare ratings data\n",
    "    rates = ratings_final.map(lambda r: ((r[0], r[1]), r[2]))\n",
    "\n",
    "    # Prepare predictions data\n",
    "    preds = predictions.map(lambda r: ((r[0], r[1]), r[2]))\n",
    "\n",
    "    # Join the ratings data with predictions data\n",
    "    rates_and_preds = rates.join(preds)\n",
    "\n",
    "    # Calculate and print MSE\n",
    "    MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()\n",
    "    print(\"Mean Squared Error of the model for the test data = {:.2f}\".format(MSE))\n",
    "\n",
    "### Classification using MLlib\n",
    "\n",
    "    # Load the datasets into RDDs\n",
    "    spam_rdd = sc.textFile(file_path_spam)\n",
    "    non_spam_rdd = sc.textFile(file_path_non_spam)\n",
    "\n",
    "    # Split the email messages into words\n",
    "    spam_words = spam_rdd.flatMap(lambda email: email.split(' '))\n",
    "    non_spam_words = non_spam_rdd.flatMap(lambda email: email.split(' '))\n",
    "\n",
    "    # Print the first element in the split RDD\n",
    "    print(\"The first element in spam_words is\", spam_words.first())\n",
    "    print(\"The first element in non_spam_words is\", non_spam_words.first())\n",
    "    # Create a HashingTf instance with 200 features\n",
    "    tf = HashingTF(numFeatures=200)\n",
    "\n",
    "    # Map each word to one feature\n",
    "    spam_features = tf.transform(spam_words)\n",
    "    non_spam_features = tf.transform(non_spam_words)\n",
    "\n",
    "    # Label the features: 1 for spam, 0 for non-spam\n",
    "    spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))\n",
    "    non_spam_samples = non_spam_features.map(lambda features:LabeledPoint(0, features))\n",
    "\n",
    "    # Combine the two datasets\n",
    "    samples = spam_samples.join(non_spam_samples)\n",
    "    # Split the data into training and testing\n",
    "    train_samples,test_samples = samples.randomSplit([0.8, 0.2])\n",
    "\n",
    "    # Train the model\n",
    "    model = LogisticRegressionWithLBFGS.train(train_samples)\n",
    "\n",
    "    # Create a prediction label from the test data\n",
    "    predictions = model.predict(test_samples.map(lambda x: x.features))\n",
    "\n",
    "    # Combine original labels with the predicted labels\n",
    "    labels_and_preds = test_samples.map(lambda x: x.label).zip(predictions)\n",
    "\n",
    "    # Check the accuracy of the model on the test data\n",
    "    accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_samples.count())\n",
    "    print(\"Model accuracy : {:.2f}\".format(accuracy))\n",
    "    \n",
    "### Clustering using MLlib\n",
    "\n",
    "    # Load the dataset into a RDD\n",
    "    clusterRDD = sc.textFile(file_path)\n",
    "\n",
    "    # Split the RDD based on tab\n",
    "    rdd_split = clusterRDD.map(lambda x: x.split('\\t'))\n",
    "\n",
    "    # Transform the split RDD by creating a list of integers\n",
    "    rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])\n",
    "\n",
    "    # Count the number of rows in RDD \n",
    "    print(\"There are {} rows in the rdd_split_int dataset\".format(rdd_split_int.count()))\n",
    "    # Train the model with clusters from 13 to 16 and compute WSSSE \n",
    "    for clst in range(13, 17):\n",
    "        model = KMeans.train(rdd_split_int, clst, seed=1)\n",
    "        WSSSE = rdd_split_int.map(lambda point: error(point)).reduce(lambda x, y: x + y)\n",
    "        print(\"The cluster {} has Within Set Sum of Squared Error {}\".format(clst, WSSSE))\n",
    "\n",
    "    # Train the model again with the best k \n",
    "    model = KMeans.train(rdd_split_int, k=15, seed=1)\n",
    "\n",
    "    # Get cluster centers\n",
    "    cluster_centers = model.clusterCenters\n",
    "    # Convert rdd_split_int RDD into Spark DataFrame\n",
    "    rdd_split_int_df = spark.createDataFrame(rdd_split_int, schema=[\"col1\", \"col2\"])\n",
    "\n",
    "    # Convert Spark DataFrame into Pandas DataFrame\n",
    "    rdd_split_int_df_pandas = rdd_split_int_df.toPandas()\n",
    "\n",
    "    # Convert \"cluster_centers\" that you generated earlier into Pandas DataFrame\n",
    "    cluster_centers_pandas = pd.DataFrame(cluster_centers, columns=[\"col1\", \"col2\"])\n",
    "\n",
    "    # Create an overlaid scatter plot\n",
    "    plt.scatter(rdd_split_int_df_pandas[\"col1\"], rdd_split_int_df_pandas[\"col2\"])\n",
    "    plt.scatter(cluster_centers_pandas[\"col1\"], cluster_centers_pandas[\"col2\"], color=\"red\", marker=\"x\")\n",
    "    plt.show()\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simple Spark Word Count Function\n",
    "\n",
    "    stopWordList = ['', 'the','a','in','of','on','at','for','by','i','you','me'] \n",
    "    def wordCount(filename, stopWordlist):\n",
    "        output = sc.textFile(filename)\n",
    "        words1 = lines.flatMap(lambda x: x.split(' '))\n",
    "        words2 = words1.map(lambda x: (x.lower(), 1))\n",
    "        wordCount = words2.reduceByKey(lambda x,y: x+y)\n",
    "        freqWords = wordCount.filter(lambda x:  x[1] >= 5 )\n",
    "        stopWords = freqWords.filter(lambda x:  x[0] in stopWordList) \n",
    "        output = stopWords.collect()\n",
    "\n",
    "        return output"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "learn-env",
   "language": "python",
   "name": "learn-env"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.10"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "139.489px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}