{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Mapreduce with bash \n", "\n", "In this notebook we're going to use `bash` to write a mapper and a reducer to count words in a file. This example will serve to illustrate the main features of Hadoop's MapReduce framework." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Table of contents\n", "- [What is MapReduce?](#mapreduce)\n", "- [The mapper](#mapper)\n", " - [Test the mapper](#testmapper)\n", "- [Hadoop it up](#hadoop)\n", " - [What is Hadoop Streaming?](#hadoopstreaming)\n", " - [List your Hadoop directory](#hdfs_ls)\n", " - [Test MapReduce with a dummy reducer](#dummyreducer)\n", " - [Shuffling and sorting](#shuffling&sorting)\n", "- [The reducer](#reducer)\n", " - [Test and run](#run)\n", "- [Run a mapreduce job with more data](#moredata)\n", " - [Sort the output with `sort`](#sortoutput)\n", " - [Sort the output with another MapReduce job](#sortoutputMR)\n", " - [Configure sort with `KeyFieldBasedComparator`](#KeyFieldBasedComparator)\n", " - [Specifying Configuration Variables with the -D Option](#configuration_variables)\n", " - [What is word count useful for?](#wordcount)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What is MapReduce? \n", "\n", "MapReduce is a computing paradigm designed to allow parallel distributed processing of massive amounts of data.\n", "\n", "Data is split across several computer nodes, there it is processed by one or more mappers. The results emitted by the mappers are first sorted and then passed to one or more reducers that process and combine the data to return the final result.\n", "\n", "![Map & Reduce](mapreduce.png)", "\n", "With [Hadoop Streaming](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html) it is possible to use any programming language to define a mapper and/or a reducer. Here we're going to use the Unix `bash` scripting language ([here](https://www.gnu.org/software/bash/manual/html_node/index.html) is the official documentation for the language)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The mapper \n", "Let's write a mapper script called `map.sh`. The mapper splits each input line into words and for each word it outputs a line containing the word and `1` separated by a tab.\n", "\n", "Example: for the input \n", "\n", "
\n", "apple orange\n", "banana apple peach\n", "\n", "\n", "\n", "`map.sh` outputs:\n", "\n", "
\n", "apple 1\n", "orange 1\n", "banana 1\n", "apple 1\n", "peach 1\n", "\n", "\n", "\n", "\n", "The _cell magic_ [`%%writefile`](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-writefile) allows us to write the contents of the cell to a file." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting map.sh\n" ] } ], "source": [ "%%writefile map.sh\n", "#!/bin/bash\n", "\n", "while read line\n", "do\n", " for word in $line \n", " do\n", " if [ -n \"$word\" ] \n", " then\n", " echo -e ${word}\"\\t1\"\n", " fi\n", " done\n", "done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After running the cell above, you should have a new file `map.sh` in your current directory. \n", "The file can be seen in the left panel of JupyterLab or by using a list command on the bash command-line.\n", "\n", "**Note:** you can execute a single bash command in a Jupyter notebook cell by prepending an exclamation point to the command." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rwx------ 1 datalab hadoopusers 126 Nov 18 08:49 map.sh\n" ] } ], "source": [ "!ls -hl map.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test the mapper \n", "We're going to test the mapper on on the command line with a small text file `fruits.txt` by first creating the text file.\n", "In this file `apple` for instance appears two times, that's what we want our mapreduce job to compute." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting fruits.txt\n" ] } ], "source": [ "%%writefile fruits.txt\n", "apple banana\n", "peach orange peach peach\n", "pineapple peach apple" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apple banana\n", "peach orange peach peach\n", "pineapple peach apple\n" ] } ], "source": [ "!cat fruits.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Test the mapper" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apple\t1\n", "banana\t1\n", "peach\t1\n", "orange\t1\n", "peach\t1\n", "peach\t1\n", "pineapple\t1\n", "peach\t1\n", "apple\t1\n" ] } ], "source": [ "!cat fruits.txt|./map.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the script `map.sh` does not have the executable bit set, you need to set the correct permissions." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "!chmod 700 map.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hadoop it up \n", "Let us now run a MapReduce job with Hadoop Streaming. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What is Hadoop Streaming \n", "\n", "Hadoop Streaming is a library included in the Hadoop distribution that enables you to develop MapReduce executables in languages other than Java. \n", "\n", "Mapper and/or reducer can be any sort of executables that read the input from stdin and emit the output to stdout. By default, input is read line by line and the prefix of a line up to the first tab character is the key; the rest of the line (excluding the tab character) will be the value.\n", "\n", "If there is no tab character in the line, then the entire line is considered as key and the value is null. The default input format is specified in the class `TextInputFormat` (see the [API documentation](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/TextInputFormat.html)) but this can can be customized for instance by defining another field separator (see the [Hadoop Streaming documentation](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Customizing_How_Lines_are_Split_into_KeyValue_Pairs).\n", "\n", "This is an example of MapReduce streaming invocation syntax:\n", "\n", "
\n", " mapred streaming \\\n", " -input myInputDirs \\\n", " -output myOutputDir \\\n", " -mapper /bin/cat \\\n", " -reducer /usr/bin/wc\n", "\n", "\n", "\n", "\n", "You can find the full official documentation for Hadoop Streaming from Apache Hadoop here: [https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html).\n", "\n", "All options for the Hadoop Streaming command are described here: [Streaming Command Options](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Streaming_Command_Options) and can be listed with the command" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Usage: $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar [options]\n", "Options:\n", " -input
\"INFO mapreduce.Job: Job ... completed successfully\"\n", "\n", "**Note:** at the beginning of next cell you'll see a command `hadoop fs -rmr wordcount/output 2>/dev/null`. This is needed because when you run a job several times mapreduce will give an error if you try to overwrite the same output directory." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob6050042463478216172.jar tmpDir=null\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "19/11/18 08:49:53 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472\n", "19/11/18 08:49:53 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0251\n", "19/11/18 08:49:53 INFO mapred.FileInputFormat: Total input files to process : 1\n", "19/11/18 08:49:54 INFO mapreduce.JobSubmitter: number of splits:2\n", "19/11/18 08:49:54 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address\n", "19/11/18 08:49:54 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", "19/11/18 08:49:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0251\n", "19/11/18 08:49:54 INFO mapreduce.JobSubmitter: Executing with tokens: []\n", "19/11/18 08:49:54 INFO conf.Configuration: resource-types.xml not found\n", "19/11/18 08:49:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.\n", "19/11/18 08:49:54 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0251\n", "19/11/18 08:49:54 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0251/\n", "19/11/18 08:49:54 INFO mapreduce.Job: Running job: job_1571899345744_0251\n", "19/11/18 08:50:03 INFO mapreduce.Job: Job job_1571899345744_0251 running in uber mode : false\n", "19/11/18 08:50:03 INFO mapreduce.Job: map 0% reduce 0%\n", "19/11/18 08:50:09 INFO mapreduce.Job: map 100% reduce 0%\n", "19/11/18 08:50:14 INFO mapreduce.Job: map 100% reduce 100%\n", "19/11/18 08:50:15 INFO mapreduce.Job: Job job_1571899345744_0251 completed successfully\n", "19/11/18 08:50:15 INFO mapreduce.Job: Counters: 54\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=79\n", "\t\tFILE: Number of bytes written=687174\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=312\n", "\t\tHDFS: Number of bytes written=78\n", "\t\tHDFS: Number of read operations=11\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\tJob Counters \n", "\t\tLaunched map tasks=2\n", "\t\tLaunched reduce tasks=1\n", "\t\tRack-local map tasks=2\n", "\t\tTotal time spent by all maps in occupied slots (ms)=474032\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=140348\n", "\t\tTotal time spent by all map tasks (ms)=9116\n", "\t\tTotal time spent by all reduce tasks (ms)=2699\n", "\t\tTotal vcore-milliseconds taken by all map tasks=9116\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=2699\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=46673920\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=13818880\n", "\tMap-Reduce Framework\n", "\t\tMap input records=3\n", "\t\tMap output records=9\n", "\t\tMap output bytes=78\n", "\t\tMap output materialized bytes=108\n", "\t\tInput split bytes=222\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=5\n", "\t\tReduce shuffle bytes=108\n", "\t\tReduce input records=9\n", "\t\tReduce output records=9\n", "\t\tSpilled Records=18\n", "\t\tShuffled Maps =2\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=2\n", "\t\tGC time elapsed (ms)=139\n", "\t\tCPU time spent (ms)=2630\n", "\t\tPhysical memory (bytes) snapshot=1531457536\n", "\t\tVirtual memory (bytes) snapshot=18885754880\n", "\t\tTotal committed heap usage (bytes)=4521459712\n", "\t\tPeak Map Physical memory (bytes)=571957248\n", "\t\tPeak Map Virtual memory (bytes)=6292967424\n", "\t\tPeak Reduce Physical memory (bytes)=388386816\n", "\t\tPeak Reduce Virtual memory (bytes)=6301384704\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=90\n", "\tFile Output Format Counters \n", "\t\tBytes Written=78\n", "19/11/18 08:50:15 INFO streaming.StreamJob: Output directory: wordcount/output\n" ] } ], "source": [ "%%bash\n", "hdfs dfs -rm -r wordcount/output 2>/dev/null\n", "mapred streaming \\\n", " -files map.sh \\\n", " -input wordcount/input \\\n", " -output wordcount/output \\\n", " -mapper map.sh \\\n", " -reducer /bin/cat" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output of the mapreduce job is in the `output` subfolder of the input directory. Let's check what's inside it." ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 2 items\n", "-rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:50 wordcount/output/_SUCCESS\n", "-rw-r--r-- 3 datalab supergroup 78 2019-11-18 08:50 wordcount/output/part-00000\n" ] } ], "source": [ "!hdfs dfs -ls wordcount/output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If `output` contains a file named `_SUCCESS` that means that the mapreduce job completed successfully.\n", "\n", "**Note:** when dealing with Big Data it's always advisable to pipe the output of `cat` commands to `head` (or `tail`)." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apple\t1\n", "apple\t1\n", "banana\t1\n", "orange\t1\n", "peach\t1\n", "peach\t1\n", "peach\t1\n", "peach\t1\n", "pineapple\t1\n" ] } ], "source": [ "!hdfs dfs -cat wordcount/output/part*|head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have gotten as expected all the output from the mapper. Something worth of notice is that the data outputted from the mapper _**has been sorted**_. We haven't asked for that but this step is automatically performed by the mapper as soon as the number of reducers is $\\gt 0$." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Shuffling and sorting \n", "The following picture illustrates the concept of shuffling and sorting that is automatically performed by Hadoop after each map before passing the output to reduce. In the picture the outputs of the two mapper tasks are shown. The arrows represent shuffling and sorting done before delivering the data to one reducer (rightmost box).\n", "![Shuffle & sort](shuffle_sort.png)", "\n", "The shuffling and sorting phase is often one of the most costly in a MapReduce job.\n", "\n", "\n", "Note: the job ran with two mappers because $2$ is the default number of mappers in Hadoop. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The reducer \n", "Let's now write a reducer script called `reduce.sh`. " ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting reduce.sh\n" ] } ], "source": [ "%%writefile reduce.sh\n", "#!/bin/bash\n", "\n", "currkey=\"\"\n", "currcount=0\n", "while IFS=$'\\t' read -r key val\n", "do\n", " if [[ $key == $currkey ]]\n", " then\n", " currcount=$(( currcount + val ))\n", " else\n", " if [ -n \"$currkey\" ]\n", " then\n", " echo -e ${currkey} \"\\t\" ${currcount} \n", " fi\n", " currkey=$key\n", " currcount=1\n", " fi\n", "done\n", "# last one\n", "echo -e ${currkey} \"\\t\" ${currcount}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set permission for the reducer script." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "!chmod 700 reduce.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test and run \n", "\n", "Test map and reduce on the shell" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apple \t 2\n", "banana \t 1\n", "orange \t 1\n", "peach \t 4\n", "pineapple \t 1\n" ] } ], "source": [ "!cat fruits.txt|./map.sh|sort|./reduce.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once we've made sure that the reducer script runs correctly on the shell, we can run it on the cluster." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "packageJobJar: [map.sh, reduce.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob4511756596880012363.jar tmpDir=null\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "19/11/18 08:50:23 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "19/11/18 08:50:25 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472\n", "19/11/18 08:50:25 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0252\n", "19/11/18 08:50:26 INFO mapred.FileInputFormat: Total input files to process : 1\n", "19/11/18 08:50:26 INFO mapreduce.JobSubmitter: number of splits:2\n", "19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address\n", "19/11/18 08:50:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", "19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0252\n", "19/11/18 08:50:26 INFO mapreduce.JobSubmitter: Executing with tokens: []\n", "19/11/18 08:50:26 INFO conf.Configuration: resource-types.xml not found\n", "19/11/18 08:50:26 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.\n", "19/11/18 08:50:26 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0252\n", "19/11/18 08:50:26 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0252/\n", "19/11/18 08:50:26 INFO mapreduce.Job: Running job: job_1571899345744_0252\n", "19/11/18 08:50:36 INFO mapreduce.Job: Job job_1571899345744_0252 running in uber mode : false\n", "19/11/18 08:50:36 INFO mapreduce.Job: map 0% reduce 0%\n", "19/11/18 08:50:43 INFO mapreduce.Job: map 100% reduce 0%\n", "19/11/18 08:50:49 INFO mapreduce.Job: map 100% reduce 100%\n", "19/11/18 08:50:50 INFO mapreduce.Job: Job job_1571899345744_0252 completed successfully\n", "19/11/18 08:50:50 INFO mapreduce.Job: Counters: 54\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=79\n", "\t\tFILE: Number of bytes written=687864\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=312\n", "\t\tHDFS: Number of bytes written=56\n", "\t\tHDFS: Number of read operations=11\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\tJob Counters \n", "\t\tLaunched map tasks=2\n", "\t\tLaunched reduce tasks=1\n", "\t\tRack-local map tasks=2\n", "\t\tTotal time spent by all maps in occupied slots (ms)=484692\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=241904\n", "\t\tTotal time spent by all map tasks (ms)=9321\n", "\t\tTotal time spent by all reduce tasks (ms)=4652\n", "\t\tTotal vcore-milliseconds taken by all map tasks=9321\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=4652\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=47723520\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=23818240\n", "\tMap-Reduce Framework\n", "\t\tMap input records=3\n", "\t\tMap output records=9\n", "\t\tMap output bytes=78\n", "\t\tMap output materialized bytes=108\n", "\t\tInput split bytes=222\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=5\n", "\t\tReduce shuffle bytes=108\n", "\t\tReduce input records=9\n", "\t\tReduce output records=5\n", "\t\tSpilled Records=18\n", "\t\tShuffled Maps =2\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=2\n", "\t\tGC time elapsed (ms)=158\n", "\t\tCPU time spent (ms)=2550\n", "\t\tPhysical memory (bytes) snapshot=1526378496\n", "\t\tVirtual memory (bytes) snapshot=18883096576\n", "\t\tTotal committed heap usage (bytes)=4490002432\n", "\t\tPeak Map Physical memory (bytes)=580694016\n", "\t\tPeak Map Virtual memory (bytes)=6292299776\n", "\t\tPeak Reduce Physical memory (bytes)=368877568\n", "\t\tPeak Reduce Virtual memory (bytes)=6298894336\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=90\n", "\tFile Output Format Counters \n", "\t\tBytes Written=56\n", "19/11/18 08:50:50 INFO streaming.StreamJob: Output directory: wordcount/output\n" ] } ], "source": [ "%%bash\n", "hdfs dfs -rm -r wordcount/output 2>/dev/null\n", "mapred streaming \\\n", " -file map.sh \\\n", " -file reduce.sh \\\n", " -input wordcount/input \\\n", " -output wordcount/output \\\n", " -mapper map.sh \\\n", " -reducer reduce.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check the output on the HDFS filesystem" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "apple \t 2\n", "banana \t 1\n", "orange \t 1\n", "peach \t 4\n", "pineapple \t 1\n" ] } ], "source": [ "!hdfs dfs -cat wordcount/output/part*|head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run a mapreduce job with more data \n", "\n", "Let's create a datafile by downloading some real data, for instance from a Web page. This example will be used to introduce some advanced configurations.\n", "\n", "Next, we download a URL with `wget` and filter out HTML tags with a `sed` regular expression." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "URL=https://www.derstandard.at/story/2000110819049/und-wo-warst-du-beim-fall-der-mauer\n", "wget -qO- $URL | sed -e 's/<[^>]*>//g;s/^ //g' >sample_article.txt" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\t1\n", "\t1\n", "\t1\n", "\t1\n", "Und\t1\n", "wo\t1\n", "warst\t1\n", "du\t1\n", "beim\t1\n", "Fall\t1\n" ] } ], "source": [ "!cat sample_article.txt|./map.sh|head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As usual, with real data there's some more work to do. Here we see that the mapper script doesn't skip empty lines. Let's modify it so that empty lines are skipped." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting map.sh\n" ] } ], "source": [ "%%writefile map.sh\n", "#!/bin/bash\n", "\n", "while read line\n", "do\n", " for word in $line\n", " do\n", " if [[ \"$line\" =~ [^[:space:]] ]]\n", " then\n", " if [ -n \"$word\" ]\n", " then\n", " echo -e ${word} \"\\t1\"\n", " fi\n", " fi\n", " done\n", "done" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Und \t1\n", "wo \t1\n", "warst \t1\n", "du \t1\n", "beim \t1\n", "Fall \t1\n", "der \t1\n", "Mauer? \t1\n", "- \t1\n", " \t1\n" ] } ], "source": [ "!cat sample_article.txt|./map.sh|head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the output of `map.sh` looks better!\n", "\n", "Note: when working with real data we need in general some more preprocessing in order to remove control characters or invalid unicode.\n", "\n", "Time to run MapReduce again with the new data, but first we need to \"put\" the data on HDFS." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "hdfs dfs -rm -r wordcount/input 2>/dev/null\n", "hdfs dfs -put sample_article.txt wordcount/input" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-rw-r--r-- 3 datalab supergroup 4.1 K 2019-11-18 08:50 wordcount/input\n" ] } ], "source": [ "# check that the folder wordcount/input on HDFS only contains sample_article.txt\n", "!hdfs dfs -ls -h wordcount/input" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the reducer" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Und \t 1\n", "wo \t 1\n", "warst \t 1\n", "du \t 1\n", "beim \t 1\n", "Fall \t 1\n", "der \t 1\n", "Mauer? \t 1\n", "- \t 1\n", " \t 2\n" ] } ], "source": [ "!cat sample_article.txt|./map.sh|./reduce.sh|head" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "packageJobJar: [map.sh, reduce.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob1458624376542077747.jar tmpDir=null\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "19/11/18 08:51:03 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "19/11/18 08:51:04 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472\n", "19/11/18 08:51:04 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0253\n", "19/11/18 08:51:05 INFO mapred.FileInputFormat: Total input files to process : 1\n", "19/11/18 08:51:05 INFO mapreduce.JobSubmitter: number of splits:2\n", "19/11/18 08:51:05 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address\n", "19/11/18 08:51:05 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", "19/11/18 08:51:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0253\n", "19/11/18 08:51:05 INFO mapreduce.JobSubmitter: Executing with tokens: []\n", "19/11/18 08:51:06 INFO conf.Configuration: resource-types.xml not found\n", "19/11/18 08:51:06 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.\n", "19/11/18 08:51:06 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0253\n", "19/11/18 08:51:06 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0253/\n", "19/11/18 08:51:06 INFO mapreduce.Job: Running job: job_1571899345744_0253\n", "19/11/18 08:51:14 INFO mapreduce.Job: Job job_1571899345744_0253 running in uber mode : false\n", "19/11/18 08:51:14 INFO mapreduce.Job: map 0% reduce 0%\n", "19/11/18 08:51:21 INFO mapreduce.Job: map 100% reduce 0%\n", "19/11/18 08:51:27 INFO mapreduce.Job: map 100% reduce 100%\n", "19/11/18 08:51:28 INFO mapreduce.Job: Job job_1571899345744_0253 completed successfully\n", "19/11/18 08:51:28 INFO mapreduce.Job: Counters: 54\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=2056\n", "\t\tFILE: Number of bytes written=691913\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=6428\n", "\t\tHDFS: Number of bytes written=2273\n", "\t\tHDFS: Number of read operations=11\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\tJob Counters \n", "\t\tLaunched map tasks=2\n", "\t\tLaunched reduce tasks=1\n", "\t\tData-local map tasks=2\n", "\t\tTotal time spent by all maps in occupied slots (ms)=482924\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=209040\n", "\t\tTotal time spent by all map tasks (ms)=9287\n", "\t\tTotal time spent by all reduce tasks (ms)=4020\n", "\t\tTotal vcore-milliseconds taken by all map tasks=9287\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=4020\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=47549440\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=20582400\n", "\tMap-Reduce Framework\n", "\t\tMap input records=191\n", "\t\tMap output records=211\n", "\t\tMap output bytes=2492\n", "\t\tMap output materialized bytes=2180\n", "\t\tInput split bytes=200\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=164\n", "\t\tReduce shuffle bytes=2180\n", "\t\tReduce input records=211\n", "\t\tReduce output records=164\n", "\t\tSpilled Records=422\n", "\t\tShuffled Maps =2\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=2\n", "\t\tGC time elapsed (ms)=146\n", "\t\tCPU time spent (ms)=2820\n", "\t\tPhysical memory (bytes) snapshot=1466261504\n", "\t\tVirtual memory (bytes) snapshot=18885009408\n", "\t\tTotal committed heap usage (bytes)=4507828224\n", "\t\tPeak Map Physical memory (bytes)=549105664\n", "\t\tPeak Map Virtual memory (bytes)=6292787200\n", "\t\tPeak Reduce Physical memory (bytes)=372985856\n", "\t\tPeak Reduce Virtual memory (bytes)=6300979200\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=6228\n", "\tFile Output Format Counters \n", "\t\tBytes Written=2273\n", "19/11/18 08:51:28 INFO streaming.StreamJob: Output directory: wordcount/output\n" ] } ], "source": [ "%%bash\n", "hadoop fs -rmr wordcount/output 2>/dev/null\n", "mapred streaming \\\n", " -file map.sh \\\n", " -file reduce.sh \\\n", " -input wordcount/input \\\n", " -output wordcount/output \\\n", " -mapper map.sh \\\n", " -reducer reduce.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the output on HDFS" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 2 items\n", "-rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:51 wordcount/output/_SUCCESS\n", "-rw-r--r-- 3 datalab supergroup 2273 2019-11-18 08:51 wordcount/output/part-00000\n" ] } ], "source": [ "!hdfs dfs -ls wordcount/output" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This job took a few seconds and this is quite some time for such a small file (4KB). This is due to the overhead of distributing the data and running the Hadoop framework. \n", "The advantage of Hadoop can be appreciated only for large datasets." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "& \t 1\n", "(Herder-Verlag) \t 1\n", "- \t 1\n", "/ \t 2\n", "1950 \t 1\n", "24 \t 1\n", "30 \t 1\n", "30-Jährige \t 1\n", "
-D mapreduce.job.output.key.comparator.class=\\\n", " org.apache.hadoop.mapred.lib.KeyFieldBasedComparator\n", " \n", "This class has some options similar to the Unix `sort`(`-n` to sort numerically, `-r` for reverse sorting, `-k pos1[,pos2]` for specifying fields to sort by).\n", "\n", "Let us see the comparator in action on our data to get the desired result. Note that this time we are removing `output2` because we're running the second mapreduce job again." ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "packageJobJar: [swap_keyval.sh] [/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/jars/hadoop-streaming-3.0.0-cdh6.3.0.jar] /tmp/streamjob1477140608741397366.jar tmpDir=null\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "19/11/18 08:52:14 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.\n", "19/11/18 08:52:16 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm472\n", "19/11/18 08:52:16 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/datalab/.staging/job_1571899345744_0255\n", "19/11/18 08:52:16 INFO mapred.FileInputFormat: Total input files to process : 1\n", "19/11/18 08:52:17 INFO mapreduce.JobSubmitter: number of splits:2\n", "19/11/18 08:52:17 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address\n", "19/11/18 08:52:17 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", "19/11/18 08:52:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1571899345744_0255\n", "19/11/18 08:52:17 INFO mapreduce.JobSubmitter: Executing with tokens: []\n", "19/11/18 08:52:17 INFO conf.Configuration: resource-types.xml not found\n", "19/11/18 08:52:17 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.\n", "19/11/18 08:52:17 INFO impl.YarnClientImpl: Submitted application application_1571899345744_0255\n", "19/11/18 08:52:17 INFO mapreduce.Job: The url to track the job: http://c101.local:8088/proxy/application_1571899345744_0255/\n", "19/11/18 08:52:17 INFO mapreduce.Job: Running job: job_1571899345744_0255\n", "19/11/18 08:52:25 INFO mapreduce.Job: Job job_1571899345744_0255 running in uber mode : false\n", "19/11/18 08:52:25 INFO mapreduce.Job: map 0% reduce 0%\n", "19/11/18 08:52:32 INFO mapreduce.Job: map 100% reduce 0%\n", "19/11/18 08:52:39 INFO mapreduce.Job: map 100% reduce 100%\n", "19/11/18 08:52:40 INFO mapreduce.Job: Job job_1571899345744_0255 completed successfully\n", "19/11/18 08:52:40 INFO mapreduce.Job: Counters: 54\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=1827\n", "\t\tFILE: Number of bytes written=689819\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=3634\n", "\t\tHDFS: Number of bytes written=1945\n", "\t\tHDFS: Number of read operations=11\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of write operations=2\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\tJob Counters \n", "\t\tLaunched map tasks=2\n", "\t\tLaunched reduce tasks=1\n", "\t\tRack-local map tasks=2\n", "\t\tTotal time spent by all maps in occupied slots (ms)=483288\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=238472\n", "\t\tTotal time spent by all map tasks (ms)=9294\n", "\t\tTotal time spent by all reduce tasks (ms)=4586\n", "\t\tTotal vcore-milliseconds taken by all map tasks=9294\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=4586\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=47585280\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=23480320\n", "\tMap-Reduce Framework\n", "\t\tMap input records=164\n", "\t\tMap output records=164\n", "\t\tMap output bytes=1945\n", "\t\tMap output materialized bytes=1929\n", "\t\tInput split bytes=224\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tReduce input groups=6\n", "\t\tReduce shuffle bytes=1929\n", "\t\tReduce input records=164\n", "\t\tReduce output records=164\n", "\t\tSpilled Records=328\n", "\t\tShuffled Maps =2\n", "\t\tFailed Shuffles=0\n", "\t\tMerged Map outputs=2\n", "\t\tGC time elapsed (ms)=163\n", "\t\tCPU time spent (ms)=2630\n", "\t\tPhysical memory (bytes) snapshot=1525448704\n", "\t\tVirtual memory (bytes) snapshot=18887892992\n", "\t\tTotal committed heap usage (bytes)=4505206784\n", "\t\tPeak Map Physical memory (bytes)=578318336\n", "\t\tPeak Map Virtual memory (bytes)=6292914176\n", "\t\tPeak Reduce Physical memory (bytes)=370286592\n", "\t\tPeak Reduce Virtual memory (bytes)=6302941184\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "\tFile Input Format Counters \n", "\t\tBytes Read=3410\n", "\tFile Output Format Counters \n", "\t\tBytes Written=1945\n", "19/11/18 08:52:40 INFO streaming.StreamJob: Output directory: wordcount/output2\n" ] } ], "source": [ "%%bash\n", "hdfs dfs -rmr wordcount/output2 2>/dev/null\n", "comparator_class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator\n", "mapred streaming \\\n", " -D mapreduce.job.output.key.comparator.class=$comparator_class \\\n", " -D mapreduce.partition.keycomparator.options=-nr \\\n", " -file swap_keyval.sh \\\n", " -input wordcount/output \\\n", " -output wordcount/output2 \\\n", " -mapper swap_keyval.sh" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 2 items\n", "-rw-r--r-- 3 datalab supergroup 0 2019-11-18 08:52 wordcount/output2/_SUCCESS\n", "-rw-r--r-- 3 datalab supergroup 1945 2019-11-18 08:52 wordcount/output2/part-00000\n" ] } ], "source": [ "!hdfs dfs -ls wordcount/output2" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "8\tdie\n", "6\tder\n", "4\tund\n", "4\tCookies\n", "3\tFall\n", "3\tSie\n", "3\tich\n", "3\tkann\n", "3\twarst\n", "3\tin\n" ] } ], "source": [ "!hdfs dfs -cat wordcount/output2/part-00000|head " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we get the output in the desired order." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Specifying Configuration Variables with the -D Option \n", "\n", "With the `-D` option it is possible to override options set in the default configuration file [`mapred_default.xml`](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml)\n", "(see the [Apache Hadoop documentation](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Specifying_Configuration_Variables_with_the_-D_Option)).\n", "\n", "One option that might come handy when dealing with out-of-memory issues in the sorting phase is the size in MB of the memory reserved for sorting `mapreduce.task.io.sort.mb`:\n", " \n", "
-D mapreduce.task.io.sort.mb=512\n", "\n", " \n", "\n", " **Note:** the maximum value for `mapreduce.task.io.sort.mb` is 2047. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## What is word count useful for? \n", "Counting the frequencies of words is at the basis of _indexing_ and it facilitates the retrieval of relevant documents in search engines." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 2 }