{ "metadata": { "name": "", "signature": "sha256:8465e297fa66fa45215065bb6a38d0c140f3a22b57cf27f29da09bd341d966d8" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![](https://bytebucket.org/davis68/resources/raw/f7c98d2b95e961fae257707e22a58fa1a2c36bec/logos/baseline_cse_wdmk.png?token=be4cc41d4b2afe594f5b1570a3c5aad96a65f0d6)](http://cse.illinois.edu/)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# The MapReduce Algorithm\n", "\n", "## Contents\n", "- [Components of MapReduce](#components)\n", " - [Map](#map)\n", " - [Reduce](#reduce)\n", " - [MapReduce](#mapreduce)\n", "- [Apache Hadoop](#hadoop)\n", " - [Local Execution](#hadoop-local)\n", " - [Cloud Execution](#hadoop-cloud)\n", "- [Resources](#resources)\n", "- [Credits](#credits)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Components of MapReduce\n", "\n", "The MapReduce algorithm consists of two separate steps. MR describes a common problem decomposition for classes of models where interprocess communication is uncommon. In the first place, MR is the programming model itself, described by its component parts below. In the second place, MR is a particular platform or implementation, of which [Apache Hadoop](https://hadoop.apache.org/) is the predominant exemplar today, as well as the proximate cause of this tutorial." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### Map\n", "\n", "In functional programming, `Map()` denotes the mapping of a set of objects onto another set or configuration via a filtering or sorting function. Essentially, one takes a collection of items and a higher-order function, applying the function to each element of the collection to yield a new derivative collection.\n", "\n", "![]()\n", "\n", "For instance, in Python we may write the following:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def count_letters(string):\n", " return len(string)\n", "\n", "words = ['Earth', 'could', 'not', 'answer', 'nor', 'the', 'Seas', 'that', 'mourn']\n", "list(map(count_letters,words))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### Reduce\n", "\n", "In functional and parallel programming, `Reduce()` essentially takes a collection of data and transforms them into a single value or derivative data structure descriptive of the whole (such as a product or sum). (This is an oversimplification, but is the piece we need to understand Hadoop.)\n", "\n", "At each element of the data structure, `reduce` passes the current result and the next item from the collection into the function argument.\n", "\n", "In Python, for instance, we could recursively multiply across a list:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from functools import reduce\n", "\n", "def product(x, y):\n", " return x*y\n", "\n", "reduce(product, [1, 2, 3, 4] )" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "More interestingly, we can build tools to, say, find the least common multiple of three numbers ([src](https://stackoverflow.com/questions/147515/least-common-multiple-for-3-or-more-numbers#147539)):" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def gcd(a, b):\n", " \"\"\"Return greatest common divisor of two numbers.\"\"\"\n", " while b: \n", " a, b = b, a % b\n", " return a\n", "\n", "def lcm(a, b):\n", " \"\"\"Return lowest common multiple of two numbers.\"\"\"\n", " return a * b // gcd(a, b)\n", "\n", "def lcmm(*args):\n", " \"\"\"Return lowest common multiple of three or more numbers.\"\"\"\n", " return reduce(lcm, args)\n", "\n", "lcmm(100, 23, 98)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "(In the final analysis, `reduce` is not terribly common as a standalone function call in Python or many other nonfunctional programming languages. This is because equivalent functionality is built into objects, such as `string.join` or list comprehensions.)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### MapReduce\n", "\n", "So why do we use _this_ model to describe our problems when using a cloud computing platform? Basically, communication is rare and expensive enough that we heavily penalize its use; in addition, most of the problems useful to data analysts are very different than those of interest to numerical computational scientists: they typically require asking questions about data sets rather than iteratively solving linear algebraic problems.\n", "\n", "At each step, reduce passes the current product or division, along with the next item from the list, to the passed-in lambda function. By default, the first item in the sequence initialized the starting value.\n", "\n", "So let's combine `map` and `reduce` from Python in two steps to see what that gains us." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Example: Word Count\n", "\n", "The following block of code implements an MR algorithm to count the frequency of words in a text, here `beowulf.txt`. The `ReadInput()`, `Map()` and `Reduce()` components are defined first, and then invoked directly below.\n", "\n", "One interesting feature of this implementation is that although it _works_, it's not terribly elegant. That is, in Python at least, you ought to just count up the total number of words _when you read them from the file_, rather than counting one for each word and later summing them up. However, this disjointed Map/Reduce approach starts to make a lot more sense when working on terabytes of data across hundreds of processors." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# Input\n", "def ReadInput(filename):\n", " with open(filename, 'r') as text:\n", " return text.read().split()\n", "\n", "# Map step: count one for each word\n", "def Map(word_list):\n", " word_counts = []\n", " # Read input\n", " for word in word_list:\n", " word_counts.append((word, 1))\n", " return word_counts\n", "\n", "# Reduce step: sum up the count for each word\n", "def Reduce(partial_counts):\n", " total_count = {}\n", " for (word, count) in partial_counts:\n", " if word in total_count:\n", " total_count[word] += count\n", " else:\n", " total_count[word] = count\n", " return total_count" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "text = ReadInput('beowulf.txt')\n", "partial_counts = Map(text)\n", "total_counts = Reduce(partial_counts)\n", "\n", "# Output results in sorted order, most to least common\n", "import operator\n", "print(sorted(total_counts.items(), key=operator.itemgetter(1))[::-1])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Other Elements of MapReduce\n", "\n", "Now, we are still simplifying things a bit from the _full_ MapReduce algorithm. That includes the following formal steps:\n", "- **Input reader**\u2014read off data in appropriately-sized chunks.\n", "- **Map function**\u2014process key/value pairs into one or more key/value pairs.\n", "- **Partition function**\u2014move data from the `Map` node which produced it to the `Reduce` node which requires it\n", "- **Compare function**\u2014sort the data to be reduced on each node\n", "- **Reduce function**\u2014reduce the key/value pairs into final outputs\n", "- **Output writer**\u2014write the data to a stable output location\n", "\n", "The most common MapReduce implementation, Apache Hadoop, allows you to explicitly specify procedures to handle each of these stages." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Apache Hadoop\n", "\n", "[Apache Hadoop](https://hadoop.apache.org/) is an open-source software framework used to process large data sets at scale on commodity hardware. It can be used serially on a single machine, or it can scale to tens of thousands of nodes on a cloud computing service like [Amazon Web Services](http://aws.amazon.com/). Despite this parallelism, however, MapReduce is [not guaranteed to be fast](https://en.wikipedia.org/wiki/MapReduce#Performance_considerations) or efficient\u2014it simply provides a robust and scalable paradigm for relatively computationally intensive processes which fit its conceptual model.\n", "\n", "Formally, there are two major pieces of Hadoop: the *H*adoop *D*istributed *F*ile *S*ystem (HDFS) and Hadoop MapReduce. HDFS implements a file system accessible across a commodity cluster and resilient to hardware failures. Hadoop MapReduce consists of a service to implement the MapReduce algorithm. MR is typically invoked either through Java or at a higher level through specialty languages like Pig and Hive.\n", "\n", "The HDFS provides a _namenode_ which contains metadata about all of the files on the system, as well as _datanodes_ for actually processing the data in the job. The MapReduce algorithm provides a _job tracker_ and _task trackers_ to administer scripts.\n", "\n", "![](https://bytebucket.org/davis68/data/raw/b11db837ab2b8de32f06d8565ba9b8046e964ada/lessons/img/hadoop-nodes.png?token=0a7a9310a77312d67f558a7c5167773d8b1d9f2a)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### Hadoop Streaming\n", "\n", "Hadoop is written in Java, and thus natively works best with that language. (In fact, Java modules are directly compatible with the Hadoop interface.)\n", "\n", "However, we won't even assume that you know any Java in this workshop, because we will utilize the [_Hadoop Streaming_](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html) interface to execute code written in C, Python, Perl, Ruby, or every other language that can write to standard output and read from standard input. To understand the logistics of this, consider two shell scripts, `mapper.sh` and `reducer.sh` which can be executed in a pipe chain thus:\n", "\n", " cat inputFile | mapper . sh | reducer . sh > outputFile\n", "\n", "To execute properly, we clearly require each script to read incoming data directly from `stdin` and write the results of the process to `stdout`. The main difference from Hadoop Streaming is that this version is explicitly local and serial, with all data piped through a single process chain. Hadoop Streaming, in contrast, will partition and deliver the data in chunks to each process automatically. Execution can be as simple as entering:\n", "\n", " hadoop jar $HADOOP_HOME/mapred/contrib/streaming/hadoop-streaming.jar \\\n", " -input \\\n", " -output \\\n", " -mapper \\\n", " -reducer \n", "\n", "What we need to provide is a mapper and a reducer which conform to Hadoop's expectations. In this case, the mapper shall output a key/value pair delimited by a \u21b9 or `\\t` character. (From our earlier Python example, this would be the word followed by `\\t` followed by a number 1.) The framework shall shuffle these values into an equal or lesser number of processes for the reducer script, which shall then map the key/value pairs it receives into output. (In the word count example, now we take the words put into this process and sum them up into the final result.)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### Standalone (Serial) Execution\n", "\n", "Recall that Hadoop has two components, HDFS and MapReduce. We need to start an HDFS system and place our data on it so that the MapReduce component can operate on the data. We then execute our MapReduce code on the Hadoop system.\n", "\n", "However, for testing purposes we can forgo these steps, and can just run MapReduce with our code to make sure that it's working as a standalone program (*i.e.*, without needing HDFS at this point)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### A \"Local\" Platform for Hadoop\n", "\n", "Hadoop doesn't come as standard on your machine, and installation is nontrivial enough to dissuade us from using it during a workshop. Thus we will utilize a remote machine which you can log into with the following command (change `csetrainingXX` to your login ID). If you have Hadoop installed locally, the steps should be the same as those below, with the exception of not needing to `ssh` to a different machine.\n", "\n", " $ ssh -Y -l csetrainingXX ec2-54-173-188-243.compute-1.amazonaws.com" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Script Workflow\n", "Our basic workflow for processing data within a `Map` or `Reduce` script often looks like the following.\n", "\n", "1. Load data into memory, whether from disk or `stdin`\n", "1. Preprocess data\n", " 1. Deal with missing or erroneous values\n", " 1. Filter noise\n", " 1. Define interpolation scheme, if any\n", "1. Process data\n", " 1. Identify events\n", " 2. Ask questions: clustering, correlations, etc.\n", "1. Integrate across data sets\n", "1. Postprocess data\n", " 1. Visualization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "#### Example: Word Count Example\n", "\n", "First, we'll take a simple text occurrence counting example using a local Hadoop implementation following the earlier workflow pattern.\n", "\n", "- If you have not yet logged in to the remote cluster, please do so:\n", " ssh -Y -l csetrainingXX ec2-54-173-188-243.compute-1.amazonaws.com\n", "\n", "- Take a look around: `pwd`, `ls /usr/local`, etc. This is a regular machine with Hadoop installed on it.\n", "\n", "- Now we will execute a word count example using a script supplied with Hadoop.\n", " mkdir input\n", " cp /usr/local/text/beowulf.txt ./input/\n", " $HADOOP_PREFIX/bin/hadoop jar $HADOOP_PREFIX/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.1.jar grep input output '\u00fe\u00e6t'\n", "\n", "- Finally, examine the resulting output. (Note that the last quoted text on the right of the Hadoop command is a regular expression, so you can search for more complex elements than we did here.)\n", " cat output/*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "#### Example: Electric Grid Data\n", "\n", "A more involved data processing example demonstrates a few more steps in the workflow, such as data cleaning and filtering as well as event identification.\n", "\n", "*P*hasor *M*easurement *U*nits (PMUs) record the frequency, voltage, and amperage of an electric power line several times a second. They may be installed anywhere from the utility customer's home to the high-voltage transmission lines. We have available to us a reasonably large data set provided by [TCIPG](http://tcipg.org/) on campus. These data files describe the electric line's status at 0.2-second intervals for more than two years, yielding more than 200 GB of data for less than a dozen homes.\n", "\n", "Recall that the MapReduce algorithm is only well-adapted to describe certain categories of parallel problems, ones which require very little interthread communication. The `Map` operation filters and sorts the data in preparation for the `Reduce` procedure, which carries out some summary process. We are thus limited in the range of queries we can make in the data (or, alternatively, we can iterate the `Map` and `Reduce` phases to effect a more complicated analysis). So for what kind of queries on these data would it be favorable to use MapReduce?\n", "\n", "- Power excursions\n", "- Times when $x$ sources of data are active\n", "- Ranges over which data exist\n", "\n", "##### The Data\n", "\n", "A typical data file looks like the following in comma-separated value (CSV) format:\n", "\n", "
Timestamp_YARDLEY:
F
_YARDLEY:
F_Quality
_YARDLEY:
DF
_YARDLEY:
DF_Quality
_YARDLEY:
S
_YARDLEY:
S_Quality
_YARDLEY-PM1:
UTKV
_YARDLEY-PM1:
UTKV_Quality
_YARDLEY-PA1:
UTKVH
_YARDLEY-PA1:
UTKVH_Quality
01-Apr-2014 13:46:11.30060.0027Good0Good0Good123.35Good-122.7439Good
01-Apr-2014 13:46:11.40060.0023Good0Good0Good123.35Good-122.6637Good
01-Apr-2014 13:46:11.50060.0024Good0Good0Good123.307Good-122.572Good
01-Apr-2014 13:46:11.60060.0035Good0Good0Good123.307Good-122.446Good
01-Apr-2014 13:46:11.70060.0026Good0Good0Good123.329Good-122.3428Good
01-Apr-2014 13:46:11.80060.0027Good0Good0Good123.329Good-122.2397Good
01-Apr-2014 13:46:11.90060.0028Good0Good0Good123.35Good-122.1366Good
01-Apr-2014 13:46:12.00060.0022Good0Good0Good123.372Good-122.0392Good
01-Apr-2014 13:46:12.10060.0021Good0Good0Good123.35Good-121.9647Good
\n", "\n", "(This goes on for tens of thousands of lines across hundreds of files.)\n", "\n", "Now, there are a lot of interesting features about these data we may want to compare externally (for instance, with other factors like climate or overall grid performance) or internally (are there hiccups or sudden variations in the data?). The data set is particularly messy, and when it gets too messy by a certain metric, we will simply throw out those data (arguably better methods may exist, although data quality standards for electric grid data are surprisingly thin on the ground).\n", "\n", "If we plot subsets of these data, this is what we find:\n", "\n", "![](https://bytebucket.org/davis68/data/raw/b11db837ab2b8de32f06d8565ba9b8046e964ada/lessons/img/pmu-voltage.png?token=f353c7eb0cd133882cc979973df1128582514bbf)\n", "\n", "![](https://bytebucket.org/davis68/data/raw/b11db837ab2b8de32f06d8565ba9b8046e964ada/lessons/img/pmu-frequency.png?token=ccc7e527b2319f08270f18b2db3c1603be8c9147)\n", "\n", "![](https://bytebucket.org/davis68/data/raw/b11db837ab2b8de32f06d8565ba9b8046e964ada/lessons/img/pmu-phase.png?token=2a0beaee08818e26a838b6778d3a9853589f31c4)\n", "\n", "##### The Script\n", "\n", "We will map the PMU value inputs to threshold events in the voltage fluctuation. In this case, we can either directly process these data with a Reduce() process, or we can write the data to disk and use them at a later time in another program. For a large data set, the main idea would be to quickly and efficiently yield the threshold events either live or for postmortem analysis. (Incidentally, to do this _live_, you would use [Apache Storm](https://storm.incubator.apache.org/), not Hadoop. The concepts are similar.) These values can then be broken out separately and studied for internal and external correlations with other data. For instance, we can check to see if threshold events occur across more than one data source within a short period of time.\n", "\n", "In order to make this tractable, we will have to clean and filter the data. First, notice in the above plots that there are gaps and misalignments in the data. We will simply strip these out by comparison with hard-coded well-understood limits (*i.e.*, frequency can't be negative and should be near 60 Hz). We will also apply a rolling-window median filter and test for threshold events if this value leaves the rolling-window standard deviation, which should be uncommon in regular operation. Now the data we deal with look like this:\n", "\n", "![](https://bytebucket.org/davis68/data/raw/b11db837ab2b8de32f06d8565ba9b8046e964ada/lessons/img/pmu-voltage-filtered.png?token=0f7373909abf0e97485baf89e9adb0eb5232ad54)\n", "\n", "So what we will do with Hadoop, in this case, is `Map` the data from the original format to a set of key/value pairs. The first part of the output, the key, will consist of time stamp and a field (so a co\u00f6rdinate entry in the original spreadsheet array); the second part, the value, will consist of an index in the standard deviation array and the corresponding peak value of the threshold event. We won't have a `Reduce` step\u2014or, equivalently, the `Reduce` step will consist of an identity operation." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Execution\n", "\n", "###### Serial Script\n", "\n", "Examine the script `pmu_map.py` if you are interested in the details of the data analysis. We will first run it as a chained process to ensure that the input/output functions properly:\n", "\n", " cat /usr/local/data/01-APR-2014_13_46_11.300_TO_01-APR-2014_17_17_51.200.csv | python /usr/local/sbin/pmu_map.py | python /usr/local/sbin/pmu_reduce.py > results.txt\n", " cat results.txt\n", "\n", "That should work without any trouble, although it may take a few minutes since the data file is large. A more complex `Reduce` step could take these threshold events and sort them in order of severity or by site\u2014or, more complexly, try to correlate them into groups of likely relation (*i.e.*, a bump in power at one site followed by a similar bump a few seconds later somewhere else).\n", "\n", "###### Standalone Hadoop\n", "\n", "Now we can plug this into our Hadoop framework to see how Hadoop behaves:\n", "\n", " rm -rf results\n", " hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-2.5.1.jar \\\n", " -D mapreduce.job.reduces=0 \\\n", " -input /usr/local/data \\\n", " -output $HOME/results \\\n", " -mapper \"python /usr/local/sbin/pmu_map.py\" \\\n", " -reducer /bin/cat\n", " cat results/*\n", " \n", "A few observations:\n", "- Hadoop is extraordinarily verbose. Assuming you are working on a machine with a web browser, you can access certain locally hosted sites, such as http://localhost:50030, to monitor the job progress.\n", "- There are obviously several threads working in parallel here\u2014you can see the output progress for different files interwoven in `stdout`.\n", "- We are accessing files that are individually ~25 MB in size, split from the base size for the data, actually 125 MB. Hadoop naturally splits files at 64 MB per chunk, so we would have to handle an arbitrary division in our input, which this script is not adapted for.\n", "- You will also notice, if you examine `pmu_map.py`, that we handle `stdin` a little differently: we have to explicitly cycle back to it, check for new input, and end _only_ if there is nothing coming in. Hadoop doesn't necessarily provide the entire file input in one chunk to the `Map` or `Reduce` scripts. [(src)](http://stackoverflow.com/a/25313519/1272411)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### HDFS Setup\n", "\n", "HDFS consists of a _namenode_, which contains metadata about the files on the system, and a collection of _datanodes_, which execute the streaming process and MapReduce components. To run a distributed MapReduce problem, we need to start all of these, as well as stop them after we have completed our tasks. (We will utilize AWS\u2019s Elastic MapReduce platform next, which will automatically take care of this step for us.)\n", "\n", "1. Format the filesystem:\n", " $ hdfs namenode -format\n", "\n", "2. Start NameNode daemon and DataNode daemon:\n", " $ start-all.sh\n", "\n", "3. Browse the web interface for the NameNode, available at http://localhost:50070.\n", "\n", "4. Make the HDFS directories required to execute MapReduce jobs:\n", " $ hdfs dfs -mkdir /user/hduser/input\n", "\n", "5. Copy the input files into the distributed filesystem:\n", " $ hdfs dfs -copyFromLocal $HADOOP_HOME/etc/hadoop /user/hduser/input\n", "\n", "6. Run an example, one which here counts the number of instances of each string matching the regular expression:\n", "\n", " $ hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.0.jar\n", " $ grep /user/hduser/input/hadoop /user/hduser/output '<[a-z.]+>'\n", "\n", "7. Examine the output files.\n", "\n", " a. Copy output files from the distributed filesystem to the local filesystem and examine them:\n", " \n", " $ hdfs dfs -get output output\n", " $ cat output/*\n", "\n", " b. View output files on the distributed filesystem:\n", " $ hdfs dfs -cat output/*\n", "\n", "8. When done, stop the daemons with:\n", " $ stop-dfs.sh\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "### Cloud Execution\n", "\n", "A much more common and practical usage mode for Hadoop is on the cloud, whether [Microsoft Azure](http://azure.microsoft.com/en-us/), [Google Cloud Platform](https://cloud.google.com/), [Amazon Web Services](http://aws.amazon.com/), or several others. (There is even a pilot Hadoop project on Illinois\u2019s [Campus Cluster](http://campuscluster.illinois.edu) Program.) We will use AWS in this workshop.\n", "\n", "AWS offers a suite of cloud-based services. We need three of them to accomplish our purposes today: [Simple Scalable Storage](https://aws.amazon.com/s3/) (S3); [Elastic Cloud Compute](https://aws.amazon.com/ec2/) (EC2); and [Elastic MapReduce](https://aws.amazon.com/elasticmapreduce/) (EMR).\n", "\n", "- S3 provides the file system on the cloud service.\n", "- EC2 provides the virtual machine instances which simulate the physical hardware we will use.\n", "- EMR provides a Hadoop platform on EC2, which can also pull in our data from S3 automatically.\n", "\n", "#### Running a Hadoop Streaming Example on EMR\n", "\n", "1. **Log in** to the [AWS site for this workshop](https://uiuc-cse.signin.aws.amazon.com/console) with the username and password assigned you in the workshop.\n", "1. Next, **create an *A*mazon *M*achine *I*nstance (AMI)**. An AMI is essentially a virtual machine of a particular hardware and software configuration which remotely executes on the cloud. This VM can be a conventional workstation, a server, or another service. We'll demonstrate how an AMI works, then use EMR to create a virtual Hadoop cluster in the next step.\n", " ![](https://bytebucket.org/davis68/data/raw/7982eeb775d5805660304f00baf84130813a3d02/lessons/img/aws-ui-1.png?token=70dfc6367cec28d9180daa05961fcabb337f5d60)\n", " ![](https://bytebucket.org/davis68/data/raw/7982eeb775d5805660304f00baf84130813a3d02/lessons/img/aws-ui-2.png?token=93d9bee9e55491574f434092017b61b1a6442eaa)\n", " ![](https://bytebucket.org/davis68/data/raw/7982eeb775d5805660304f00baf84130813a3d02/lessons/img/aws-ui-3.png?token=f7c73a2d372a747322b00012d7a28d520da82ed9)\n", " 1. To create a job, select \u2018EC2\u2019 from the menu at the top and `Launch Instances` on the left. This will take you to a wizard which we will use to launch our jobs.\n", " 1. On the Tag page, set the instance name to `csetrainingXX`, where this is your login ID.\n", " 1. To monitor jobs, select \u2018EC2\u2019 from the menu at the top and `Monitor Instances` on the left. There you can see your machine instances running on the cloud and some data about their execution.\n", " 1. Now we can treat this machine instance as any remote server: we can `ssh` to it, treat it as a normal machine, and use it to host or process data. (This is actually what you were doing a moment ago with the remote stand-alone Hadoop example.)\n", " 1. Once you're done taking a look at it, please right-click on your instance and select \"Terminate instance\".\n", "1. Now we will **launch a Hadoop job using EMR**. Behind the scenes, this is automatically setting up and invoking Hadoop on EC2. It's easier for Hadoop because it always has an up-to-date configuration and launches the cluster of slave nodes automatically.\n", " 1. At the EMR dashboard, click \"Create cluster\".\n", " 1. You are now at the \"Cluster Configuration\" page. There are some very good standard examples available by clicking the \"Configure sample application\" button in the upper right, so do so now. Select \"Word Count\" from the drop-down of examples and then review the resulting settings on the main page.\n", " ![](https://bytebucket.org/davis68/data/raw/5996c9e3e4776eae89e602cbd8c6d015b6f77945/lessons/img/aws-emr-sample-wc.png?token=696d9f90549955bc80e4f7a34e1777a08d7bc466)\n", " 1. Make a few tweaks:\n", " 1. Select the Word Count standard example.\n", " 1. Set the name to your login ID. **Make sure to use the number of your user name instead of XX.**\n", " 1. Examine your hardware configuration to see what AWS/EMR has set up for you.\n", " 1. Click the `Edit` button to the right of the Word Count step at the bottom. Examine the command-line arguments and the format for setting up a Hadoop job here.\n", " 1. When you review and launch this job, a monitoring page will become available to you and you can track the progress (or lack thereof).\n", " ![](https://bytebucket.org/davis68/data/raw/7982eeb775d5805660304f00baf84130813a3d02/lessons/img/aws-emr-launch-job-pending.png?token=14faa5aa6311caa8c4712ccd64a11085226de1cc)\n", " 1. When the job is complete, review the logs (on this page under Steps) and the output in your S3 bucket." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Resources\n", "\n", "- [CSE Hadoop Handout](https://github.com/uiuc-cse/data-fa14/blob/gh-pages/lessons/hadoop-handout.pdf?raw=true)\n", "- [QwikLabs Intro to AWS EMR](https://run.qwiklabs.com/focuses/preview/1055?locale=en)\n", "- [Writing a Hadoop MapReduce Program in Python / Michael Noll](http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/)\n", "- [Hadoop Streaming Documentation](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopStreaming.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Credits\n", "\n", "Neal Davis developed these materials for [Computational Science and Engineering](http://cse.illinois.edu/) at the University of Illinois at Urbana\u2013Champaign.\n", "\n", "\n", "This content is available under a [Creative Commons Attribution 4.0 Unported License](https://creativecommons.org/licenses/by/4.0/).\n", "\n", "### Data Sets\n", "\n", "The PMU data set we used was generously provided by the Trustworthy Cyber Infrastructure for the Power Grid (TCIPG) laboratory at the University of Illinois. We are especially grateful to Tim Yardley for arranging its availability.\n", "\n", "\n", "The data set has been made available to users under a [Creative Commons Attribution 4.0 Unported License](https://creativecommons.org/licenses/by/4.0/).\n", "\n", "[![](https://bytebucket.org/davis68/resources/raw/f7c98d2b95e961fae257707e22a58fa1a2c36bec/logos/baseline_cse_wdmk.png?token=be4cc41d4b2afe594f5b1570a3c5aad96a65f0d6)](http://cse.illinois.edu/)" ] }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }