{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Homework 7"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "For this Homework we'll explore data analytics with a distributed computing stack, namely Spark + Hadoop's HDFS. You'll configure your own HDFS and Spark \"cluster\" for distributed operation. The master and slave machines will actually all run on the same instance, but the process of configuring and running is identical. You'll be able to view cluster diagonistics on your system as though it were a distributed system. \n",
    "\n",
    "You would apply the same steps to deploy and run a production cluster with HDFS and Spark. The process is really quite simple, but make sure you do each step carefully and verify that you have each stage working before proceeding to the next."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "\n",
    "In this lab we'll be viewing the Spark console from your EC2 instance. The Spark consoles are by default on port 8080, 8081 and 4040 which you can't access directly through your instance's firewall. Therefore you'll have to \n",
    "\n",
    "* Edit your ssh connection script (or Putty config.) on your local machine to add a tunnel from port 8080 on your client to 8080 on your EC2 instance. The format is exactly the same as the tunnels you have from port 8888 etc to the matching port on the instance for Ipython use.\n",
    "\n",
    "* Edit your ssh connection script or Putty on your client machine to add tunnels from port 4040 and 8081 on localhost to 4040 and 8081 respectively on the EC2 instance. These ports have more Spark job info. \n",
    "\n",
    "* Edit ~/.bashrc on the EC2 instance and copy the \"PATH=\" and \"export PATH\" lines from ~/.bash_profile. This is so that the executor, which is actually running on the same machine, gets the right path and runs the right version of Python. \n",
    "\n",
    "Do this before connecting to your instance to run IPython. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Executing This Notebook\n",
    "\n",
    "This notebook is designed to run on your EC2 instance. Open an ssh connection on your instance, and create a directory for this HW, say \"hw7\" and <code>cd</code> into it. Right click on the download link at the top right of this page, and then do\n",
    "<pre>\n",
    "wget &lt;paste&gt;\n",
    "</pre>\n",
    "to downlaod this notebook onto your instance. Then start ipython as normal:\n",
    "<pre>\n",
    "ipython notebook\n",
    "</pre>\n",
    "and open this notebook. Later we will connect this running notebook to Spark. \n",
    "\n",
    "As before, its more convenient to access this notebook from a local (laptop) browser. Open a browser, and point it at <code>http://localhost:8888</code> or whaterever ipython printed when it started the notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Configuring the Hadoop File System"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Hadoop's distributed file system comprises a *name node* and various *data nodes*. See e.g. see <a href=\"https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html\">this description</a>\n",
    "\n",
    "Your EC2 instance has a hadoop installation in <code>/opt/hadoop</code>. Notice also that if you type the following from a bash shell:\n",
    "<pre>\n",
    "echo $HADOOP_PREFIX\n",
    "/opt/hadoop\n",
    "</pre>\n",
    "\n",
    "i.e. the variable $HADOOP_PREFIX has already been configured in your <code>~/.bash_profile</code> to point to the HADOOP install directory (if this variable isnt set for some reason, please modify your .bash_profile so that it is)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The hadoop configuration files are in <code>$HADOOP_PREFIX/etc/hadoop</code>. Change to that directory now. Check the contents of each of these configuration files:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<code>core-site.xml</code> contains the specification of the hdfs service:\n",
    "\n",
    "<pre>&lt;configuration&gt;\n",
    "     &lt;property&gt;\n",
    "         &lt;name&gt;fs.default.name&lt;/name&gt;\n",
    "         &lt;value&gt;hdfs://localhost:9000&lt;/value&gt;\n",
    "     &lt;/property&gt;\n",
    "&lt;/configuration&gt;\n",
    "</pre>\n",
    "\n",
    "You use the prefix \"hdfs://localhost:9000/\" in filenames to access HDFS files from this machine. From another machine you should replace \"localhost\" with the IP address of your instance. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<code>hdfs-site.xml</code> contains details of the hdfs service:\n",
    "\n",
    "<pre>&lt;configuration&gt;\n",
    "     &lt;property&gt;\n",
    "         &lt;name&gt;dfs.replication&lt;/name&gt;\n",
    "         &lt;value&gt;1&lt;/value&gt;\n",
    "     &lt;/property&gt;\n",
    "     &lt;property&gt;\n",
    "         &lt;name&gt;dfs.name.dir&lt;/name&gt;\n",
    "         &lt;value&gt;/data/hdfs/name&lt;/value&gt;\n",
    "     &lt;/property&gt;\n",
    "     &lt;property&gt;\n",
    "         &lt;name&gt;dfs.data.dir&lt;/name&gt;\n",
    "         &lt;value&gt;/data/hdfs/data&lt;/value&gt;\n",
    "     &lt;/property&gt;\n",
    "&lt;/configuration&gt;\n",
    "</pre>\n",
    "\n",
    "This configuration defines a replication factor of 1 for files (i.e. only one rather than multiple copies are kept), and names for the data directories on the machine where HDFS data is kept. The \"name\" directory is use by the namenode to keep track of file blocks and other meta data. The \"data\" directory contains the contents of those blocks. In a full distributed implementation, the name node and name directory would be on one machine while the data node service and data directories would be on multiple other machines. \n",
    "\n",
    "Make sure that the directories specified above, i.e. <code>/data/hdfs/name</code> and <code>/data/hdfs/data</code> exist and are writable. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Although we wont be using mapreduce this time, its useful to do this configuration. \n",
    "\n",
    "In <code>mapred-site.xml</code>\n",
    "\n",
    "\n",
    "<pre>&lt;configuration&gt;\n",
    "     &lt;property&gt;\n",
    "         &lt;name&gt;mapred.job.tracker&lt;/name&gt;\n",
    "         &lt;value&gt;hdfs://localhost:9001&lt;/value&gt;\n",
    "     &lt;/property&gt;\n",
    "&lt;/configuration&gt;\n",
    "</pre>\n",
    "\n",
    "which specifies that the job tracker service should run on port 9001 if you start mapreduce."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Hadoop uses ssh for inter-server communication, even on the same machine. Try doing:\n",
    "\n",
    "<pre>\n",
    "ssh localhost\n",
    "</pre>\n",
    "\n",
    "It will probably throw an error because the ssh key for ec2-user has been changed. That's OK. Go ahead and remove the \"known_hosts\" file \n",
    "<pre>\n",
    "rm ~/.ssh/known_hosts\n",
    "</pre>\n",
    "and try again. It should work this time. You will be logged into your machine (recursively) twice, so just do:\n",
    "<pre>\n",
    "exit\n",
    "</pre>\n",
    "to return to a logged-in-once prompt. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Formatting HDFS\n",
    "Just like a normal file system, you have to format HDFS before its first use. Make sure that the hadoop binary directories are in your path (/opt/hadoop/bin and /opt/hadoop/sbin) if not edit your <code>.bash_profile</code> so that they are. \n",
    "\n",
    "To format hdfs, do\n",
    "<pre>\n",
    "hadoop namenode -format\n",
    "</pre>\n",
    "\n",
    "You should only need to do this once, unless your HDFS get corrupted somehow. If your FS does get corrupted, remove everything in the data directories first before reformatting. \n",
    "\n",
    "Just like formatting a disk, this step will destroy any data you have saved in HDFS. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start HDFS"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To start the hdfs service do\n",
    "<pre>\n",
    "cd $HADOOP_PREFIX\n",
    "sbin/start-dfs.sh\n",
    "</pre>\n",
    "and say \"yes\" if it prompts you to add the host \"0.0.0.0\" (localhost by another name) to the list of known hosts. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Exploring HDFS\n",
    "\n",
    "Now you can explore the hadoop filesystem. The list of file system commands is\n",
    "available <a href=\"https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html\">here</a>\n",
    "\n",
    "Initially there will be nothing there, so lets create a directory:\n",
    "\n",
    "<pre>\n",
    "hadoop fs -mkdir /mydata\n",
    "</pre>\n",
    "\n",
    "and then to look at the results do \n",
    "\n",
    "<pre>\n",
    "hadoop fs -ls /\n",
    "</pre>\n",
    "\n",
    "From <a href=\"https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html\">the documentation</a> you can see that most Unix file system commands are available, except those that relate to a working directory (like \"cd\"). HDFS doesnt persist any state between commands so there is no notion of a working directory. So you always need to give it absolute paths to files and directories. \n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding a Data File to HDFS\n",
    "\n",
    "Next lets add a large data file to HDFS. cd to the directory <code>/data/rcv1</code>. This directory contains the RCV1 news article data, including raw text files. There are four files \n",
    "\n",
    "<pre>\n",
    "lyrl2004_tokens_test_pt0.dat.gz \n",
    "lyrl2004_tokens_test_pt1.dat.gz \n",
    "lyrl2004_tokens_test_pt2.dat.gz \n",
    "lyrl2004_tokens_test_pt3.dat.gz \n",
    "</pre>\n",
    "\n",
    "Unzip these files, and concatenate them into a single text file called all.txt. Upload this file to hdfs with:\n",
    "\n",
    "<pre>\n",
    "hadoop fs -put all.txt /mydata/rcv1_raw.txt\n",
    "</pre>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then doing \n",
    "<pre>\n",
    "hadoop fs -ls /mydata\n",
    "</pre>\n",
    "should produce an output like\n",
    "\n",
    "<pre>\n",
    "Found 1 items\n",
    "-rw-r--r--   1 ec2-user supergroup  608585485 2015-11-06 16:33 /mydata/rcv1_raw.txt\n",
    "</pre>\n",
    "\n",
    "which shows a file of 608 MB. Check that your own file's size matches."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Where does Hadoop store the Data?\n",
    "\n",
    "Remember that you specifed namenode and data directories in <code>/data/hdfs/{name,data}</code>. The data directory does indeed contain your data. Browse down the directory hierarchy until you find a directory containing large blocks of data. You may find it helpful to use the \"du\" command to find which directories have large contents under them. \n",
    "\n",
    "What size are the file blocks in your HDFS? "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Configuring Spark\n",
    "\n",
    "Configuring Spark for distributed computing is similar. First, you need to make sure that the shell variable <code>SPARK_HOME</code> points to <code>/opt/spark</code>. Edit your .bash_profile so this is true and call \n",
    "<pre>\n",
    "source ~/.bash_profile\n",
    "</pre> \n",
    "in your ec2 connection to make sure that it gets set in the current environment. \n",
    "\n",
    "The subdirectory <code>$SPARK_HOME/conf</code> contains configuration files. Change to this directory. There are templates for the configuration files which only need a few modifications so copy these to the actual config file names:\n",
    "\n",
    "<pre>\n",
    "cp slaves.template slaves\n",
    "cp spark-env.sh.template spark-env.sh\n",
    "cp spark-defaults.conf.template spark-defaults.conf\n",
    "</pre>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Starting Spark\n",
    "\n",
    "To start Spark, cd to the main spark directory <code>/opt/spark</code> and do\n",
    "\n",
    "<pre>\n",
    "sbin/start-all.sh\n",
    "</pre>\n",
    "\n",
    "You should now be able to view the Spark Master console by pointing your local (laptop) browser at\n",
    "\n",
    "<pre>\n",
    "http://localhost:8080\n",
    "</pre>\n",
    "\n",
    "if you have trouble, make sure you created the SSH tunnel mentioned in \"Setup\" at the start of this HW. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## The Spark Master Console\n",
    "\n",
    "The Spark console on <code>http://localhost:8080</code> will tell you where the Spark service is running (normally port 7077) and various other useful information. With this info, we can connect to the Spark service from IPython. \n",
    "\n",
    "The Master Console also contains links to a Console for the main Executor and the Spark Application console. These links point directly to the corresponding ports on your EC2 instance. But since they're behind a firewall you cant access them through these links. However, since we made tunnels before you can access them through the tunnels:\n",
    "\n",
    "**Spark Executor Console** is at <code>http://localhost:8081</code>. This contains information about executors, like how much memory is allocated for each one. Open a browser window to this URL (you can click on the link in this notebook) and take a look. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using IPython to connect to Spark\n",
    "\n",
    "**NOTE:** We will connect to Spark directly from this running Ipython notebook. You dont have to copy cells into Pyspark. So execute the following cell directly to set Python paths to the PySpark libraries:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import sys\n",
    "import os\n",
    "spark_home = os.environ.get('SPARK_HOME', None)\n",
    "sys.path.insert(0, os.path.join(spark_home, 'python'))\n",
    "sys.path.insert(0, os.path.join(spark_home, 'python/pyspark'))\n",
    "sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we create a Spark Context that wraps the Spark Server. **DO THIS ONLY ONCE PER SESSION**. The server wont allow you to have multiple Spark contexts. The Spark application server is on port 7077 and we get the local host name with a call to platform.node()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.context import SparkContext\n",
    "import platform\n",
    "import atexit\n",
    "\n",
    "nodename = platform.node()\n",
    "sc = SparkContext(master=\"spark://\"+nodename+\":7077\", appName=\"hw7\")\n",
    "atexit.register(lambda: sc.stop())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###Aside: Running Remotely\n",
    "\n",
    "You could also run your queries in an IPython shell running on another machine. You would change the URL to \n",
    "<pre>\n",
    "master=\"spark://&lt;machinename&gt;:7077\"\n",
    "</pre>\n",
    "after making sure that this port was open between the two machines.\n",
    "\n",
    "\n",
    "###Aside: IPython Spark Profile\n",
    "You can also create a special ipython profile containing the above initialization. Some instructions for doing that are given <a href=\"https://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/\">here</a>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "###Continuing\n",
    "\n",
    "Lets check that things are working before going further. Lets load a local file (make sure you fixed permissions as in Lab 9). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "a = sc.textFile(\"/data/MovieLens/movies.dat\")\n",
    "a.count()  # There should be 10681 movies"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "Now lets see if we can import a large file from hdfs:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "raw = sc.textFile(\"hdfs://localhost:9000/mydata/rcv1_raw.txt\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Which runs as usual at lightning speed. But that's because the copy didnt happen yet. To make Spark really copy we compute the size again."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "raw.count()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##Spark and HDFS Integration\n",
    "\n",
    "Although we could have used any HDFS URI in the call to \"textFile\" above, Spark is optimized to run with its executors on the slave nodes of an HDFS cluster. When used in that mode, Spark will pull data primarily from local files in HDFS reducing the network load."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "## The Spark Application Console\n",
    "\n",
    "Now look at the Spark application console which is at http://localhost:4040 (Note: there is a link to this URL from the Spark master console but it won't work because its on the wrong side of the tunnels that we made). \n",
    "\n",
    "Click on the line for job you just ran (it should be the top line). \n",
    "\n",
    "Look at the stages of the job. You will see some summary metrics for it. \n",
    "\n",
    "Click also on the \"storage\" tab, what do you see? \n",
    "\n",
    "You'll probably be surprised, but that's because \"storage\" here refers to Spark's memory cached storage. There isnt any yet, because we havent asked Spark to cache anything. Lets do that now:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "raw.cache()\n",
    "raw.take(10)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now go back to the application console, click on the \"storage\" tab if you're not there already and refresh the browser window. What do you see this time?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Your Mission\n",
    "\n",
    "Now that Spark and HDFS are up and running, your task should be quite easy. Write a series of Spark commands to do the following:\n",
    "\n",
    "* Filter the \"raw\" RDD to remove lines starting with \".I\" or \".W\"\n",
    "\n",
    "* Split each line at whitespace and contatenate all the words into a single RDD of Strings which are words. \n",
    "\n",
    "* Make an RDD of (word, 1) pairs from the last RDD of Strings.\n",
    "\n",
    "* Reduce the last RDD into (word, count) pairs such that each word is unique. \n",
    "\n",
    "* Count the number of unique words. \n",
    "\n",
    "* Sort the last RDD by count in descending order.\n",
    "\n",
    "* Make a loglog plot of word frequency vs rank for the last RDD. You'll need to convert it to an np:array. \n",
    "\n",
    "* Save the last word/count RDD to HDFS as \"hdfs://localhost:9000/mydata/rcv1_counts\"\n",
    "\n",
    "* Include code cells below this one containing all your code. Execute the cells, and include the output. \n",
    "\n",
    "**TIPS:** Make sure you check your intermediate results after each step. e.g. use the \"take()\" method to look at the first few rows in each RDD. This will also force execution of any pending Spark queries, so that running time is spread out over your session."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##Review the Application Console\n",
    "\n",
    "This time the application console will contain information about every stage in the calculation you ran.\n",
    "\n",
    "Take another look. The Event Timeline is particularly helpful for analyzing performance. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Submit\n",
    "\n",
    "Submit this notebook to the <a href=\"https://bcourses.berkeley.edu/courses/1377158/assignments/7009110\">HW7 submission link</a>."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Stop Spark\n",
    "\n",
    "You can control-C from the terminal window where you started ipython to stop the notebook browser and the Spark context it contains. To stop Spark proper, cd to <code>$SPARK_HOME</code> and do \n",
    "\n",
    "<pre>\n",
    "sbin/stop-all.sh\n",
    "</pre>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Stop HDFS\n",
    "\n",
    "Change to the <code>$HADOOP_PREFIX</code> directory and do:\n",
    "\n",
    "<pre>\n",
    "sbin/stop-dfs.sh\n",
    "</pre>\n",
    "\n",
    "*Note* in theory you dont need to do this if you are going to use HDFS next time. HDFS runs as a service and should start automatically the next time your instance starts. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Optional: Write in Scala"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can do this exercise in Scala from a spark-shell prompt. To do this, shutdown any IPython notebooks. \n",
    "\n",
    "* Make sure Spark and HDFS have been started as per the instructions above. \n",
    "\n",
    "* Start Spark shell and point it at the master service like this:\n",
    "\n",
    "<pre>\n",
    "cd $SPARK_HOME\n",
    "bin/spark-shell --master spark://${HOSTNAME}:7077 \n",
    "</pre>\n",
    "\n",
    "* From there follow the sequence of steps in \"Your Mission\" above. You may find it convenient to develop your queries in a script (a text file of scala lines) that you load with the <code>:load filename</code> command from the Spark prompt.\n",
    "\n",
    "* Make a note of the time it takes to perform each step in the process using the Spark Application Console, which is once again on <code>http://localhost:4040</code>\n",
    "\n",
    "* Rerun the PySpark notebook and note the times for each step. Do you see a difference? "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "##Debugging\n",
    "\n",
    "You should have enough information to debug most problems that might happen. \n",
    "* Make a note of where the log files are stored (printed when starting hadoop and Spark services), and look at them if needed. \n",
    "* Make sure the port numbers are what you expect. \n",
    "* HDFS can sometimes get into a strange state. To restart HDFS, make sure to stop it first, **remove all the data files by hand**, then format the namenode and restart the hdfs service.\n",
    "* Go one step at a time, and make sure you verify that each step is working before continuing."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}