# A simple MapReduce job with mrjob

mrjob is a Python module for writing multi-step MapReduce jobs in Python. In this notebook we're going to run a basic wordcount example. 

Find here the mrjob documentation: [https://mrjob.readthedocs.io/en/latest/](https://mrjob.readthedocs.io/en/latest/)

The file we're going to use is called `file.txt` and has a size of 500MB.

In [6]:
%%bash
ls -lh file.txt

-r--r--r-- 1 user123 hadoopusers 429M Apr 17 23:00 file.txt


Write a mrjob file `word_count.py` using the Jupyter cell magic `%%file`

In [10]:
%%file word_count.py
from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

 def mapper(self, _, line):
 yield "chars", len(line)
 yield "words", len(line.split())
 yield "lines", 1

 def reducer(self, key, values):
 yield key, sum(values)


if __name__ == '__main__':
 MRWordFrequencyCount.run()

Overwriting word_count.py


We're going to use $10$ map and $3$ reduce tasks.

In [11]:
%%bash

DATAFILE=file.txt
STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
N=10

# N map tasks
python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE

"lines"	24000
"chars"	447935288
"words"	70482885


No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 3.0.0
Creating temp directory /tmp/word_count.x123.20200417.210235.762742
Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210235.762742/files/...
Running step 1 of 1...
 packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob8532801643576016516.jar tmpDir=null
 Connecting to ResourceManager at c100.local/10.7.0.100:8032
 Connecting to ResourceManager at c100.local/10.7.0.100:8032
 Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6670
 Total input files to process : 1
 Adding a new node: /default/10.7.0.101:9866
 Adding a new node: /default/10.7.0.111:9866
 Adding a new node: /default/10.7.0.110:9866
 Adding a new node: /default/10.7.0.114:9866
 Adding a new node: /default/10.7.

Run the same job but this time with $4$ mappers and keep track of the job duration.

In [12]:
%%bash
START=$(date +%s);

DATAFILE=/home/dataLAB/data/wiki429MB # 429MB
STREAMING_JAR=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
N=4

# N map tasks
python word_count.py --jobconf mapreduce.job.maps=$N --jobconf mapreduce.job.reduces=3 -r hadoop --hadoop-streaming-jar $STREAMING_JAR $DATAFILE
2>/dev/null

END=$(date +%s);
echo $((END-START)) | awk '{print "Duration: "int($1/60)":"int($1%60)}'

"lines"	24000
"chars"	447935288
"words"	70482885
Duration: 0:56


No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 3.0.0
Creating temp directory /tmp/word_count.x123.20200417.210414.346782
Copying local files to hdfs:///user/x123/tmp/mrjob/word_count.x123.20200417.210414.346782/files/...
Running step 1 of 1...
 packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob1751647060912723961.jar tmpDir=null
 Connecting to ResourceManager at c100.local/10.7.0.100:8032
 Connecting to ResourceManager at c100.local/10.7.0.100:8032
 Disabling Erasure Coding for path: /user/x123/.staging/job_1586332778980_6672
 Total input files to process : 1
 number of splits:4
 yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
 Submitting tokens for job: job_1586332778980_6672
 Executing with tokens