{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# A simple MapReduce job with mrjob" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "mrjob is a Python module for writing multi-step MapReduce jobs in Python. In this notebook we're going to run a basic wordcount example. \n", "\n", "Find here the mrjob documentation: [https://mrjob.readthedocs.io/en/latest/](https://mrjob.readthedocs.io/en/latest/)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The file we're going to use is called `file.txt` and has a size of 500MB." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-r--r--r-- 1 user123 hadoopusers 429M Apr 17 23:00 file.txt\n" ] } ], "source": [ "%%bash\n", "ls -lh file.txt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Write a mrjob file `word_count.py` using the Jupyter cell magic `%%file`" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Overwriting word_count.py\n" ] } ], "source": [ "%%file word_count.py\n", "from mrjob.job import MRJob\n", "\n", "class MRWordFrequencyCount(MRJob):\n", "\n", " def mapper(self, _, line):\n", " yield \"chars\", len(line)\n", " yield \"words\", len(line.split())\n", " yield \"lines\", 1\n", "\n", " def reducer(self, key, values):\n", " yield key, sum(values)\n", "\n", "\n", "if __name__ == '__main__':\n", " MRWordFrequencyCount.run()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We're going to use $10$ map and $3$ reduce tasks." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\"lines\"\t24000\n", "\"chars\"\t447935288\n", "\"words\"\t70482885\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "No configs found; falling back on auto-configuration\n", "No configs specified for hadoop runner\n", "Looking for hadoop binary in $PATH...\n", "Found hadoop binary: /usr/bin/hadoop\n", "Using Hadoop version 3.0.0\n", "Creating temp directory /tmp/word_count.x123.20200417.210235.762742\n", "Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/files/...\n", "Running step 1 of 1...\n", " WARNING: Use \"yarn jar\" to launch YARN applications.\n", " packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob8532801643576016516.jar tmpDir=null\n", " Connecting to ResourceManager at c100.local/10.7.0.100:8032\n", " Connecting to ResourceManager at c100.local/10.7.0.100:8032\n", " Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6670\n", " Total input files to process : 1\n", " Adding a new node: /default/10.7.0.101:9866\n", " Adding a new node: /default/10.7.0.111:9866\n", " Adding a new node: /default/10.7.0.110:9866\n", " Adding a new node: /default/10.7.0.114:9866\n", " Adding a new node: /default/10.7.0.118:9866\n", " Adding a new node: /default/10.7.0.105:9866\n", " Adding a new node: /default/10.7.0.102:9866\n", " Adding a new node: /default/10.7.0.115:9866\n", " number of splits:10\n", " yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", " Submitting tokens for job: job_1586332778980_6670\n", " Executing with tokens: []\n", " resource-types.xml not found\n", " Unable to find 'resource-types.xml'.\n", " Submitted application application_1586332778980_6670\n", " The url to track the job: http://c100.local:8088/proxy/application_1586332778980_6670/\n", " Running job: job_1586332778980_6670\n", " Job job_1586332778980_6670 running in uber mode : false\n", " map 0% reduce 0%\n", " map 100% reduce 0%\n", " map 100% reduce 33%\n", " map 100% reduce 100%\n", " Job job_1586332778980_6670 completed successfully\n", " Output directory: hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output\n", "Counters: 55\n", "\tFile Input Format Counters \n", "\t\tBytes Read=450112396\n", "\tFile Output Format Counters \n", "\t\tBytes Written=49\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=266750\n", "\t\tFILE: Number of bytes written=3508513\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=450113846\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\t\tHDFS: Number of bytes written=49\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of read operations=45\n", "\t\tHDFS: Number of write operations=6\n", "\tJob Counters \n", "\t\tData-local map tasks=4\n", "\t\tLaunched map tasks=10\n", "\t\tLaunched reduce tasks=3\n", "\t\tRack-local map tasks=6\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=345584640\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=79928320\n", "\t\tTotal time spent by all map tasks (ms)=67497\n", "\t\tTotal time spent by all maps in occupied slots (ms)=3509844\n", "\t\tTotal time spent by all reduce tasks (ms)=15611\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=811772\n", "\t\tTotal vcore-milliseconds taken by all map tasks=67497\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=15611\n", "\tMap-Reduce Framework\n", "\t\tCPU time spent (ms)=20440\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tFailed Shuffles=0\n", "\t\tGC time elapsed (ms)=1626\n", "\t\tInput split bytes=1450\n", "\t\tMap input records=24000\n", "\t\tMap output bytes=870866\n", "\t\tMap output materialized bytes=293646\n", "\t\tMap output records=72000\n", "\t\tMerged Map outputs=30\n", "\t\tPeak Map Physical memory (bytes)=622194688\n", "\t\tPeak Map Virtual memory (bytes)=6293897216\n", "\t\tPeak Reduce Physical memory (bytes)=378032128\n", "\t\tPeak Reduce Virtual memory (bytes)=6305234944\n", "\t\tPhysical memory (bytes) snapshot=7270031360\n", "\t\tReduce input groups=3\n", "\t\tReduce input records=72000\n", "\t\tReduce output records=3\n", "\t\tReduce shuffle bytes=293646\n", "\t\tShuffled Maps =30\n", "\t\tSpilled Records=144000\n", "\t\tTotal committed heap usage (bytes)=19748356096\n", "\t\tVirtual memory (bytes) snapshot=81836744704\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "job output is in hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output\n", "Streaming final output from hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/output...\n", "Removing HDFS temp directory hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742...\n", "Removing temp directory /tmp/word_count.x123.20200417.210235.762742...\n" ] } ], "source": [ "%%bash\n", "\n", "DATAFILE=file.txt\n", "STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar\n", "N=10\n", "\n", "# N map tasks\n", "python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the same job but this time with $4$ mappers and keep track of the job duration." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\"lines\"\t24000\n", "\"chars\"\t447935288\n", "\"words\"\t70482885\n", "Duration: 0:56\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "No configs found; falling back on auto-configuration\n", "No configs specified for hadoop runner\n", "Looking for hadoop binary in $PATH...\n", "Found hadoop binary: /usr/bin/hadoop\n", "Using Hadoop version 3.0.0\n", "Creating temp directory /tmp/word_count.x123.20200417.210414.346782\n", "Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/files/...\n", "Running step 1 of 1...\n", " WARNING: Use \"yarn jar\" to launch YARN applications.\n", " packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob1751647060912723961.jar tmpDir=null\n", " Connecting to ResourceManager at c100.local/10.7.0.100:8032\n", " Connecting to ResourceManager at c100.local/10.7.0.100:8032\n", " Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6672\n", " Total input files to process : 1\n", " number of splits:4\n", " yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled\n", " Submitting tokens for job: job_1586332778980_6672\n", " Executing with tokens: []\n", " resource-types.xml not found\n", " Unable to find 'resource-types.xml'.\n", " Submitted application application_1586332778980_6672\n", " The url to track the job: http://c100.local:8088/proxy/application_1586332778980_6672/\n", " Running job: job_1586332778980_6672\n", " Job job_1586332778980_6672 running in uber mode : false\n", " map 0% reduce 0%\n", " map 25% reduce 0%\n", " map 100% reduce 0%\n", " map 100% reduce 33%\n", " map 100% reduce 100%\n", " Job job_1586332778980_6672 completed successfully\n", " Output directory: hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output\n", "Counters: 54\n", "\tFile Input Format Counters \n", "\t\tBytes Read=449811647\n", "\tFile Output Format Counters \n", "\t\tBytes Written=49\n", "\tFile System Counters\n", "\t\tFILE: Number of bytes read=266033\n", "\t\tFILE: Number of bytes written=2132040\n", "\t\tFILE: Number of large read operations=0\n", "\t\tFILE: Number of read operations=0\n", "\t\tFILE: Number of write operations=0\n", "\t\tHDFS: Number of bytes read=449812231\n", "\t\tHDFS: Number of bytes read erasure-coded=0\n", "\t\tHDFS: Number of bytes written=49\n", "\t\tHDFS: Number of large read operations=0\n", "\t\tHDFS: Number of read operations=27\n", "\t\tHDFS: Number of write operations=6\n", "\tJob Counters \n", "\t\tLaunched map tasks=4\n", "\t\tLaunched reduce tasks=3\n", "\t\tRack-local map tasks=4\n", "\t\tTotal megabyte-milliseconds taken by all map tasks=146442240\n", "\t\tTotal megabyte-milliseconds taken by all reduce tasks=79375360\n", "\t\tTotal time spent by all map tasks (ms)=28602\n", "\t\tTotal time spent by all maps in occupied slots (ms)=1487304\n", "\t\tTotal time spent by all reduce tasks (ms)=15503\n", "\t\tTotal time spent by all reduces in occupied slots (ms)=806156\n", "\t\tTotal vcore-milliseconds taken by all map tasks=28602\n", "\t\tTotal vcore-milliseconds taken by all reduce tasks=15503\n", "\tMap-Reduce Framework\n", "\t\tCPU time spent (ms)=11650\n", "\t\tCombine input records=0\n", "\t\tCombine output records=0\n", "\t\tFailed Shuffles=0\n", "\t\tGC time elapsed (ms)=485\n", "\t\tInput split bytes=584\n", "\t\tMap input records=24000\n", "\t\tMap output bytes=870866\n", "\t\tMap output materialized bytes=278702\n", "\t\tMap output records=72000\n", "\t\tMerged Map outputs=12\n", "\t\tPeak Map Physical memory (bytes)=625303552\n", "\t\tPeak Map Virtual memory (bytes)=6294085632\n", "\t\tPeak Reduce Physical memory (bytes)=383156224\n", "\t\tPeak Reduce Virtual memory (bytes)=6303899648\n", "\t\tPhysical memory (bytes) snapshot=3602575360\n", "\t\tReduce input groups=3\n", "\t\tReduce input records=72000\n", "\t\tReduce output records=3\n", "\t\tReduce shuffle bytes=278702\n", "\t\tShuffled Maps =12\n", "\t\tSpilled Records=144000\n", "\t\tTotal committed heap usage (bytes)=10420224000\n", "\t\tVirtual memory (bytes) snapshot=44077326336\n", "\tShuffle Errors\n", "\t\tBAD_ID=0\n", "\t\tCONNECTION=0\n", "\t\tIO_ERROR=0\n", "\t\tWRONG_LENGTH=0\n", "\t\tWRONG_MAP=0\n", "\t\tWRONG_REDUCE=0\n", "job output is in hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output\n", "Streaming final output from hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/output...\n", "Removing HDFS temp directory hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782...\n", "Removing temp directory /tmp/word_count.x123.20200417.210414.346782...\n" ] } ], "source": [ "%%bash\n", "START=$(date +%s);\n", "\n", "DATAFILE=/home/dataLAB/data/wiki429MB # 429MB\n", "STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar\n", "N=4\n", "\n", "# N map tasks\n", "python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE\n", "2>/dev/null\n", "\n", "END=$(date +%s);\n", "echo $((END-START)) | awk '{print \"Duration: \"int($1/60)\":\"int($1%60)}'" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }