# Introduction to Hadoop Ecosystem

**by Serhat Çevikel**

## Start SSH

First let's get ssh server service up, since hadoop components communicate over ssh protocol:

In [None]:
sudo service ssh stop
sudo service ssh start

And check that sshd works:

In [None]:
service ssh status

## Configuration

Let's check some environment variables:

In [None]:
echo $HADOOP_HOME
echo $HADOOP_CONF_DIR
echo $HADOOP_PREFIX
echo $PATH

Let's view important configuration files. This is a very simple configuration for standalone mode:

In [None]:
ls $HADOOP_CONF_DIR

In [None]:
cat $HADOOP_CONF_DIR/core-site.xml

In [None]:
cat $HADOOP_CONF_DIR/hdfs-site.xml

In [None]:
cat $HADOOP_CONF_DIR/mapred-site.xml

In [None]:
cat $HADOOP_CONF_DIR/yarn-site.xml

In [None]:
cat $HADOOP_CONF_DIR/slaves

Before first use, the hdfs must be formatted. We do not do it now, since data is already imported into hdfs:

In [None]:
#yes Y | hdfs namenode -format

## HDFS

### Start and stop HDFS:

Start the hdfs services:

The
```Bash
2>&1 | grep -Pv "^WARNING"
```
part is there for suppressing annoying WARNING messages in the standard error

In [None]:
start-dfs.sh 2>&1 | grep -Pv "^WARNING"

Check which hadoop services run:

In [None]:
jps

Check the status of hdfs:

In [None]:
hdfs dfsadmin -report 2>&1 | grep -Pv "^WARNING"

If the report shows that "Safe Mode is ON", run the command:

In [None]:
hdfs dfsadmin -safemode leave 2>&1 | grep -Pv "^WARNING"

And check again:

In [None]:
hdfs dfsadmin -report 2>&1 | grep -Pv "^WARNING"

To stop the services you will use these commands:

In [None]:
# stop-dfs.sh

The logs exist at:

In [None]:
ls /opt/hadoop-2.9.2/logs/

## HDFS operations

First let's create a test file and import into hdfs:

In [None]:
echo "this a test file" > deneme

In [None]:
cat deneme

In [None]:
hdfs dfs -put ~/deneme / 2>&1 | grep -Pv "^WARNING"

And check the file system:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Read the contents of the file in the hdfs:

In [None]:
hdfs dfs -cat /deneme 2>&1 | grep -Pv "^WARNING"

Create a copy of the file:

In [None]:
hdfs dfs -cp /deneme /deneme2 2>&1 | grep -Pv "^WARNING"

Check that it is created:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Create a directory named somedir in hdfs:

In [None]:
hdfs dfs -mkdir /somedir 2>&1 | grep -Pv "^WARNING"

Check that it is created:

In [None]:
hdfs dfs -ls / 2>&1 | grep -Pv "^WARNING"

Move deneme2 into somedir:

In [None]:
hdfs dfs -mv /deneme2 /somedir 2>&1 | grep -Pv "^WARNING"

Check the contents of somedir:

In [None]:
hdfs dfs -ls /somedir 2>&1 | grep -Pv "^WARNING"

Now export somedir from hdfs to local file system:

In [None]:
hdfs dfs -get /somedir ~/ 2>&1 | grep -Pv "^WARNING"

Check whether it exists in the local file system:

In [None]:
ls ~

In [None]:
ls ~/somedir

Check the disk usage of files and directories:

In [None]:
hdfs dfs -du / 2>&1 | grep -Pv "^WARNING"

What exists under /data?

In [None]:
hdfs dfs -ls /data 2>&1 | grep -Pv "^WARNING"

We know the imdb, comtrade_s1 and he_sisli datasets from previous sessions

- ncdc is a part of a huge dataset on detailed meteorological data for USA beginning from 1901
- ngrams is a dataset of the words that appeared in books at books.google.com

We will try to make use of all this data in this and the following session

Now delete the directory /somedir in hdfs. Note that we should pass the recursive (-r) option just as we should do in the local file system:

In [None]:
hdfs dfs -rm -r /somedir 2>&1 | grep -Pv "^WARNING"

And check the hdfs for errors:

In [None]:
hdfs fsck / 2>&1 | grep -Pv "^WARNING"

You can view the report at:

In [None]:
curl "http://localhost:50070/fsck?ugi=hadoop&path=%2F"

To access a remote or local system though the web UI, you can point your browser to the above link (not in binder)

More commands are listed at:

https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html

https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-common/FileSystemShell.html

https://data-flair.training/blogs/top-hadoop-hdfs-commands-tutorial/

https://www.edureka.co/blog/hdfs-commands-hadoop-shell-command

```Bash
Usage: hadoop fs [generic options]
	[-appendToFile ... ]
	[-cat [-ignoreCrc] ...]
	[-checksum ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-copyFromLocal [-f] [-p] [-l] [-d] ... ]
	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] ... ]
	[-count [-q] [-h] [-v] [-t []] [-u] [-x] ...]
	[-cp [-f] [-p | -p[topax]] [-d] ... ]
	[-createSnapshot []]
	[-deleteSnapshot ]
	[-df [-h] [ ...]]
	[-du [-s] [-h] [-x] ...]
	[-expunge]
	[-find ... ...]
	[-get [-f] [-p] [-ignoreCrc] [-crc] ... ]
	[-getfacl [-R] ]
	[-getfattr [-R] {-n name | -d} [-e en] ]
	[-getmerge [-nl] [-skip-empty-file] ]
	[-help [cmd ...]]
	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [ ...]]
	[-mkdir [-p] ...]
	[-moveFromLocal ... ]
	[-moveToLocal ]
	[-mv ... ]
	[-put [-f] [-p] [-l] [-d] ... ]
	[-renameSnapshot ]
	[-rm [-f] [-r|-R] [-skipTrash] [-safely] ...]
	[-rmdir [--ignore-fail-on-non-empty] ...]
	[-setfacl [-R] [{-b|-k} {-m|-x } ]|[--set ]]
	[-setfattr {-n name [-v value] | -x name} ]
	[-setrep [-R] [-w] ...]
	[-stat [format] ...]
	[-tail [-f] ]
	[-test -[defsz] ]
	[-text [-ignoreCrc] ...]
	[-touchz ...]
	[-truncate [-w] ...]
	[-usage [cmd ...]]
```

**EXERCISE 1:**

Use above commands at least once for your own example
- On the local filesystem, create a test2 file that holds number 1 to 100 using seq
- Create a directory named testdir on hdfs
- Import test2 file into hdfs under testdir directory
- Change the name of test1 directory to testdir2 using mv on hdfs
- Copy the test2 file to the root of hdfs
- On the local filesystem, create a test3 file that holds number 101 to 200 using seq
- Append test3 into test2 in hdfs file using -appendToFile command
- Cat the test2 file under hdfs
- Find the files and directories with names including "test" phrase on hdfs
- List the files under the root of hdfs
- Find the disk usage of all directories under /data on hdfs (report as human friendly numbers=
- Export test2 from hdfs into local file system (use force option to overwrite the existing test2)

Note that most of common flags to shell commands also apply to hdfs equivalents

In [None]:
pass1=
encrypt="U2FsdGVkX190J7s7LVS6p2cqrbvdJpxJNOGHMATezfYAhbeuCpP2EZPDpxVFyR5k 9DCRgkB/keBMrs0b8Jk6TR0nyKJTJO1wzxTab3L/rmJ2biVxIPamOnW9cN8o4aUD o/oi3CyQQKor5Ubt7AOzJQ7d8fAxc3dNe7iNq6btxRAj5ENjL5KwqM5pBXuLc9O9 qaQqmoDowdClkObnWe5BlvPSB7tdQWYPG31aTjQw0GkUXuHaVE/gPTLCUTNornw7 OYpQxWiIyD/+xHu/tDRprapWOCc4uC/PetLHiz3BRYI+riQQ5+mjrZObfByzW5w0 dQ/lljz/SZEiqppHTBLIXm9ewGJUGetIs+wPtw4S1KfUen8GlNUZZ13dAc3fMZIY P+EeVtGeZaLPAAoa2a6vHv8mAUOwMmIwVfcbRtf2E5ngl6mJWp/Z02ElVunZP/zF GdpkXdp5NjiNBH2vE+7e+n7PynHnFX66W80Vs6IM/+r81qcKQUfwWgl2ti+ASLoy z2pNqqq6jc4SQUA8glImJX55r2sBxdCisn9fUDeOliVCgxCuxjNLTN5O6VA/KebD 9RUxUwDKX87NQiL3g7GNjjgXQ1O2yAusEULCCOZhNkoVnvwgVv1Z0ySY58auft8J oqov/7SX0igWtDHyo8yYdzzZIyYqoQK4LsI9yJJasLLNS/v44UgZCRSuyMBNGfkw piBBhATl9Q3MiWacKmpUE2VlZf432iqUTIkza5SLQ0zhQA3kNpwh7kNCdBu0krfc O3IFpH0m5sNQczdlrO/+AQylw9ht0oTP2oZEFONv3DI="
solution=$(echo $encrypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)
echo "$solution"; echo
for l in "$solution"; do eval "${l}"; done

## YARN

Start yarn services:

In [None]:
start-yarn.sh

Check whether resource manager works by:

In [None]:
jps

You can stop the service by:

In [None]:
#stop-yarn.sh

### Map Reduce Job: Word Count on NCDC Data

We will go through two examples using the 1932-1936 years of the ncdc weather data set:

- Word Count
- Max Temperature

This example is the "hello world" of map reduce and also cited inside the official documentation

We will first implement the map reduce job as a unix command with pipes

If this works right, we will run it using hadoop streaming

Two versions will be run:
- One with a mapper and reducer,
- And the other with a mapper, combiner and reducer

In larger datasets run on many nodes, it is good practice to use a combiner so that the network traffic between the mapper and reducer is minimized

Note that all codes/commands must accept data from stdin and emit the result to stdout
It is best to delimit fields by tab

The mapper phase creates a key value pair out of the data. In the wc example, the original files are send to stdout 

"head" command exists just in order to limit the visible output and will not be a part of the mapper

In [None]:
cat ~/data/ncdc/19{01..03} | head

Next, the reducer will count the lines

In [None]:
cat ~/data/ncdc/19{01..03} | wc -l

Now we will convert this into a simple map reduce job:

- Note that the output folder should not exist, so flush it before running the job
- If the command takes parameters, wrap it around single or double quotes
- If the command includes pipes, do it like:
- `bash -c "your command | second command"`

- Note that input and output paths are inside the hdfs
- Mapper and reducer paths are inside the main filesystem
- Run these commands inside the docker shell:

First let's create a directory for outputs:

In [None]:
hdfs dfs -mkdir -p /output 2>&1 | grep -Pv "^WARNING"

Create a directory for outputs from jobs with ncdc dataset:

In [None]:
hdfs dfs -mkdir -p /output/ncdc 2>&1 | grep -Pv "^WARNING"

Before starting a new job, always make sure the output directory is empty otherwise an error is returned:

In [None]:
hdfs dfs -rm -r -f /output/ncdc/* 2>&1 | grep -Pv "^WARNING"

The path to hadoop-streaming jar file is:

In [None]:
ls $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar

Let's get the paths to cat and wc:

In [None]:
which cat
which wc

The output directory should be non-existent before the command:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/19{01..03} \
-output /output/ncdc/1 \
-mapper /bin/cat \
-reducer '/usr/bin/wc -l' \
2>&1 | grep -Pv "^WARNING"

Let's check the result:

In [None]:
hdfs dfs -cat /output/ncdc/1/* 2>&1 | grep -Pv "^WARNING"

It is in line with what he got from the local run

Now we will rewrite the job adding a combiner:

The combiner will take over the job of reducer: For each task, the word count is calculated
Now the reducer will just add the word counts!

From the shell, the "bc" command will do it for us.

First let's see what happens until the reducer:

In [None]:
# for file in /data/ncdc/txt/*; \ # each file is send to cat separately
 # and we see what mapper, combiner and reducer does
# do cat $file | \ # mapper
# wc -l; done # combiner

for file in ~/data/ncdc/19{01..03}; do cat $file | wc -l; done

Now, add the reducer:
- "paste -sd+" puts a "+" sign between the numbers
- bc will calculate this formula, and add the numbers

In [None]:
for file in ~/data/ncdc/19{01..03}; do cat $file | wc -l; done | paste -sd+ | bc

Now let's convert it to a mapreduce job:

Note that we have a new output directory:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/19{01..03} \
-output /output/ncdc/2 \
-mapper /bin/cat \
-combiner '/usr/bin/wc -l' \
-reducer "bash -c 'paste -sd+ | bc'" \
2>&1 | grep -Pv "^WARNING"

Now let's see the result:

In [None]:
hdfs dfs -cat /output/ncdc/2/* 2>&1 | grep -Pv "^WARNING"

### Title Count Across Years Using IMDB Dataset

Now we will revisit the old friend 'imdb' dataset:

We will first split the title.basics file into equal parts

In [None]:
tldr split

Let's split the first 30k rows of title.basics.tsv into 3 parts:

In [None]:
mkdir -p ~/data/split

In [None]:
cd ~/data/split && \
head -30000 ~/data/imdb/tsv2/title.basics.tsv | tail -n+2 | split -l 10000

In [None]:
ls ~/data/split

In [None]:
hdfs dfs -put -f ~/data/split /data 2>&1 | grep -Pv "^WARNING"

In [None]:
hdfs dfs -ls /data 2>&1 | grep -Pv "^WARNING"

**EXERCISE 2:**

You job is to 
- Get the start year column of the files as the mapper
- Get the count of each year as the combiner
- Aggregate the count of years in each task as the reducer

You can use the "cut" command to get the necessary column. Note that default field delimiter for cut is "\t", the same as the files

You can view the initial rows of the file with head, so that you decide on which column to extract

For combiner and reducer, you can use a small but very talented tool called "q" inside sandbox. "q" uses sqlite as its backend, however, it does not need a database: It can work on columnar data fed from stdin. Usual sql statements just work!

Only you should write "-" for the "from" clause and fields are named as c1, c2, etc. You can use select, from, group by and aggeragete functions such as count() or sum().

Note that when feeding into combiner or reducer you should wrap the line inside quotes. But 
the statement itself needs quotes. So one of the quote pair should be single and the other should be double as such: -combiner 'q "select ........"'

The output path you provide must be non-existent. So either provide a new one or flush the existing one.

First try the mapper, combiner and reducer on the command line:

```Bash
# only mapper
for file in ~/data/split/*; do your_mapper | head -3; echo; done

1894
1892
1892

1919
1919
1919

1929
1929
1930
```

Then we add the combiner ("head" and "column" are there just for visual purposes):
**Note that in q, when there is no header row, columns are referred as c1, c2, and the standart input is "-" in the FROM clause as such:
```SQL
SELECT c10
FROM -
```

```Bash
# mapper + combiner
for file in ~/data/split/*; do your_mapper | \
q "your_combiner" | \
head -3; done

1892 3
1893 1
1894 6

1912 1
1915 4
1916 5

1919 1
1926 1
1927 1`
```

And last, we add our reducer:

```Bash
# mapper + combiner + reducer
for file in ~/data/split/*; do your_mapper | \
q "your_combiner"; done | \
q "your_reducer" | \
pr --columns=3

1892 3			1911 506		1930 937
1893 1			1912 601		1931 1023
1894 6			1913 980		1932 1072
1895 18			1914 1225		1933 1062
1896 105		1915 1475		1934 1176
1897 38			1916 1235		1935 1182
1898 44			1917 1211		1936 1261
1899 46			1918 1038		1937 1238
1900 82			1919 1076		1938 675
1901 35			1920 941		1939 55
1902 35			1921 953		1940 13
1903 57			1922 892		1942 4
1904 21			1923 831		1944 1
1905 33			1924 888		1945 1
1906 41			1925 1021		1946 1
1907 49			1926 997		1949 1
1908 158		1927 1009		1950 1
1909 307		1928 1007		1951 2
1910 363		1929 965		1993 1
```


Now we are ready to run it as a mapreduce job on hdfs and yarn, before that we flush the output directory (or pass a non-existent one)

In [None]:
hdfs dfs -rm -r -f /output/split \
2>&1 | grep -Pv "^WARNING"

And run you mapreduce job:

```Bash
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/split \
-output /output/split \
-mapper "your_mapper" \
-combiner 'q "your_combiner"' \
-reducer 'q "your_reducer"' \
2>&1 | grep -Pv "^WARNING"

1892 3			1911 506		1930 937	
1893 1			1912 601		1931 1023	
1894 6			1913 980		1932 1072	
1895 18			1914 1225		1933 1062	
1896 105		1915 1475		1934 1176	
1897 38			1916 1235		1935 1182	
1898 44			1917 1211		1936 1261	
1899 46			1918 1038		1937 1238	
1900 82			1919 1076		1938 675	
1901 35			1920 941		1939 55	
1902 35			1921 953		1940 13	
1903 57			1922 892		1942 4	
1904 21			1923 831		1944 1	
1905 33			1924 888		1945 1	
1906 41			1925 1021		1946 1	
1907 49			1926 997		1949 1	
1908 158		1927 1009		1950 1	
1909 307		1928 1007		1951 2	
1910 363		1929 965		1993 1	
```


**SOLUTION 2:**

In [None]:
pass1=
#encrypt="U2FsdGVkX1/3ITjk9bC8+/uY393hUM9NM+lLH5yhYtatuHSSPiyLLXtpQEv0r7dV 5mEYMVLspvDT7nD6aQOz9KFsGra/d34BKOMZxg3rTga5/sNlsuxYAWQ/c02s/9wE qYAq+VYVeLXfvDF/0Ihjr/3DEpJywvl/UhN+9idf/df4zvogW0UAwg/ruvUPuMq3 ixlLMN0b1NhE0mrv+OE/iveCtaf87nhWEoz9i0JlD4jp1vA40pM0ro22z9bBUcWk u/ythMTpzOb0Cumci/lWZ0jiCElECtBVB5jLC2P6WOtQxHX8qRQYWs2GPigGZIFP ZdL6Twwq5sXnsZDKF09NOHgJMiAv/RK9DGsRaNOjrYPs+UNXiGLWe0n+LkYoiP3+ kQrBIQJXKk2cRp/a76VCu//EWaj+bW0gjcfkrVQUyi8J3PUMCLb9JYgAeQHzyAoe zNTHne9dfDd8sj01MDYaTzzfa7evSNnSxvH00on8k1ksoYZcP8IRlNF6rx4ZANEH AzfyZRme8aV3GLJw1uAANVt0/SYnbdLQqy4Z4oYfnmIR+LUmXYqaocKuIeORBhqT bBl1UCj5hjiIZoRGTraTo6kuUfptah68fhm7+WXppbtGhGrlkqx+WWfzbGJP2+Fb VFq7yV8wXyc/mJfSFwzviyowKx1JkahVpHX3NKgPiDPPsAjiIGq+WK/jsnx75cFG ilLM1lNDM21LMviVoUq+EjUvVbmMSa8/knE9VXePQk07xJhk3yv6uuktzFIfkKrH 4N0fX0hEmcC6BDYfX1XcwOqOk5bUCGYkNNpU2fyHSaqBK4bSNhg04K9fnGYribVg EyFlpOB/bAM3BFrQL/vyW8S35EBAs5NLMFKTT9CHqSJ+TPZPahj6gwtmymiq1gz1 yLQsPUobBppWKnFvasneR8lfrqfRk3rVEvgpGu7+9Tz270VUoO+BKpm/FoVQqvm2 Qxwflk23pAvEeIJLGcyj68NJYktAQhi4BWeiAkpATGdMem3YPI++jusgM4oJumKA KO/wAEUuN3c6lgWCZ4Kbm1fqW3LLm2X8av5D5Ynsh3XgXglu7TbWtvu6Q7TC5M75 TNf+WTt/MG03P0Spqgty8hbCRb60IJhZJYJ+2+O9MNBScfSeDIqPF+s7lU0msI5G whUiXoY8b5Zuyo5aqiqtS/dRdCXwuio5DskBebLlB7YDTdAtmCtzvVNCYQUg5YCG 9VRUzj1gc9N75S0gFPv8fg=="
solution=$(cat ~/crypt/06_mr1.crypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)
echo "$solution"; echo
for l in "$solution"; do eval "${l}"; done

### Creating Scripts for MapReduce Job: Maximum Temperature Example on NCDC Dataset

Now, using the same NCDC data set we'll recreate the maximum temperature example from the Elephant Book as a "Hadoop streaming" job:

Your computers have a century of the temperature data of USA. We will use just the first 10 years of this data

And for each year we will get the max temperature

We will first start with standard unix tools

I played a bit with the original script given as a part of the supplementary material for the Elephant Book

Note that there may be empty files and the job must have a remedy for this issue

The script is as follows:

```Bash
#!/usr/bin/env bash

# adjusted by Serhat Cevikel to measure the time

#START=$(date +%s.%N)
path=$1
starty=$2
endy=$3

years=$(seq $starty $endy)


for year in $years
do
 filee="${path}/${year}"
 echo -ne `basename $year `"\t"
 cat $filee | \ 
 awk '{ temp = substr($0, 88, 5) + 0;
 q = substr($0, 93, 1);
 if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
 END { print max }'
done
```

Missing temp values are coded as 9999 and they are excluded
"q" is a quality code and should be one of 0,1,4,5,9 to be included

The path to script is ~/codes/hadoop_max_temperature.sh

In my implementation it takes 3 parameters: the path to gz files, start year and end year

The data resides at ~/data/ncdc

Let's do it for 1901 to 1903

In [None]:
~/codes/hadoop_max_temperature.sh \
~/data/ncdc \
1901 1903

Now let's transform it to a map reduce job:

In the map phase, the key value pairs are extracted from the data: the year and temp reading

In the reduce phase the max temp for each year is calculated

mapper is:

```R
#!/usr/bin/Rscript

con <- file("stdin")
#con <- file("1910")
liness <- readLines(con)
close(con)

year <- as.numeric(substr(liness, 16, 19))
temp <- as.numeric(substr(liness, 88, 92))
qq <- as.numeric(substr(liness, 93, 93))

output <- cbind(year, temp)

output <- output[temp != 9999 & qq %in% c(0, 1, 4, 5, 9),]

for (i in seq_along(output[,1]))
{
 pasted <- paste(output[i,], collapse = "\t")
 cat(sprintf("%s\n", pasted))
}
```

The code accepts input from stdin so can be used similar to the previous one - before Hadoop

In [None]:
cat ~/data/ncdc/{1901..1903} | ~/codes/mapper.R | head

The reducer code is as follows:

```R
#!/usr/bin/Rscript

con <- file("stdin")
#com <- file("mapped")
liness <- readLines(con)
close(con)

keyval <- list()

for (i in seq_along(liness))
{
 linex <- unlist(strsplit(liness[i], split = "\t"))
 key <- linex[1]
 val <- as.numeric(linex[2])

 cur.maxval <- keyval[[key]]

 if (is.null(cur.maxval))
 { 
 keyval[[key]] <- val 
 }
 else
 { 
 if (val > cur.maxval)
 {
 keyval[[key]] <- val 
 }
 }
}

keys <- as.numeric(names(keyval))
vals <- as.numeric(unlist(keyval))

output <- matrix(c(keys, vals), ncol = 2)
output <- output[order(keys),, drop = F]

for (i in seq_along(output[,1]))
{
 pasted <- paste(output[i,], collapse = "\t")
 cat(sprintf("%s\n", pasted))
}
```

In [None]:
cat ~/data/ncdc/19{01..03} | ~/codes/mapper.R | ~/codes/reducer.R

Now we can run the map reduce job. Note that we have to pass the custom script files via "-file" option so that all nodes can run it:

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/19{01..03} \
-output /output/ncdc/3 \
-mapper ~/codes/mapper.R \
-reducer ~/codes/reducer.R \
-file ~/codes/mapper.R \
-file ~/codes/reducer.R \
2>&1 | grep -Pv "^WARNING"

Check the output:

In [None]:
hdfs dfs -cat /output/ncdc/3/* 2>&1 | grep -Pv "^WARNING"

Or we can run it with a combiner (the same script as the reducer for this case):

In [None]:
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \
-input /data/ncdc/19{01..03} \
-output /output/ncdc/4 \
-mapper ~/codes/mapper.R \
-combiner ~/codes/reducer.R \
-reducer ~/codes/reducer.R \
-file ~/codes/mapper.R \
-file ~/codes/reducer.R \
2>&1 | grep -Pv "^WARNING"

In [None]:
hdfs dfs -cat /output/ncdc/4/* 2>&1 | grep -Pv "^WARNING"

## SQOOP

We will use sqoop in order to import data
- from RDBMS
- into HDFS as text files
- or as hive tables

First let's start postgresql service:

In [None]:
sudo service postgresql start

Check whether it runs:

In [None]:
psql -U postgres -c "\l"

And let's remember the table names and table fields in imdb2: (You can also use the imdb_database file at home directory)

In [None]:
psql -U postgres -d imdb2 -c "\dt+"

In [None]:
psql -U postgres -d imdb2 -c "\d+ public.*"

And let's see whether sqoop can connect to the database:

In [None]:
sqoop list-tables --connect jdbc:postgresql://localhost:5432/imdb2 \
--username postgres 2>&1 | grep -Pv "^(Warning|Please|WARNING)"

Now, first import a single table as a whole into hdfs

The target directory should be non-existent. The direct flag is for fast imports:

In [None]:
sqoop import --connect jdbc:postgresql://localhost:5432/imdb2 \
--username postgres \
--table name_basics \
--split-by birthyear \
--target-dir /import1 \
--direct \
2>&1 | grep -Pv "^(Warning|Please|WARNING)"

In [None]:
hdfs dfs -ls /import1 2>&1 | grep -Pv "^WARNING"

And view the file:

In [None]:
hdfs dfs -cat /import1/* 2>&1 | grep -Pv "^WARNING" | head

Now let's run a query on the database and import the results:
Note that now we split the output to 4 parts using the mapper (-m) flag:

In [None]:
sqoop import --connect jdbc:postgresql://localhost:5432/imdb2 \
--username postgres \
--query "SELECT * FROM name_basics WHERE birthyear > 1990 \
AND birthyear IS NOT NULL AND \$CONDITIONS" \
--split-by birthyear \
-m 4 \
--target-dir /import2 \
--direct \
2>&1 | grep -Pv "^(Warning|Please|WARNING)"

Now let's check whether the new directory and file(s) exist:

In [None]:
hdfs dfs -ls /import2 2>&1 | grep -Pv "^WARNING"

In 2.9.2 version there is no -head command in hdfs (however it is added in 3.1.0)
We can view the tails however:

In [None]:
seq 0 3 | xargs -i hdfs dfs -tail /import2/part-m-0000{} 2>&1 | grep -Pv "^WARNING"

## Integrated Sqoop & MR exercise

### Phase 1: Sqoop

**EXERCISE 3:**

From title_basics select all movies that was shot between 1950 and 1953, runtimeminues is between 60 and 100 and includes comedy in genres and averagerating is not null

Return startyear from title_basics and join with title_ratings (using tconst) to return averagerating. Import into /import3 directory at hdfs by sqoop using 4 mappers, splitting over averagerating values. Do not return tconst/

**Hint:** First run the query with psql and save into a variable (so as not to overstuff the browser windows with a long output). And check the head and line count of the variable contents. If it works then convert it to a sqoop job!

First flush the /import3 directory

In [None]:
hdfs dfs -rm -r /import3 2>&1 | grep -Pv "^WARNING"

The files should be as such:

```Bash
for i in {0..3};
do hdfs dfs -cat /import3/part-m-0000${i} 2>&1 | grep -Pv "^WARNING" | \
pee 'head -1' 'wc -l' | tr "\n" "\t" | xargs -i echo -e "{}";
done

1950,3.8	21	
1951,4.5	264	
1952,6.1	550	
1953,8.5	34
```

**SOLUTION 3:**

In [None]:
pass1=
solution=$(cat ~/crypt/06_sqoop1.crypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)
echo "$solution"; echo
for l in "$solution"; do eval "${l}"; done

### Phase 2: MapReduce

Now first delete the _SUCCESS file under /import3

In [None]:
hdfs dfs -rm /import3/_SUCCESS 2>&1 | grep -Pv "^WARNING"

And check the first lines and total lines:

In [None]:
for i in {0..3};
do hdfs dfs -cat /import3/part-m-0000${i} 2>&1 | grep -Pv "^WARNING" | \
pee 'head -1' 'wc -l' | tr "\n" "\t" | xargs -i echo -e "{}";
done

**EXERCISE 4:**

Write a map reduce job with a mapper, combiner and reducer to return the average ratings for each year. You can recycle our previous example with cut and q

* Hint 1: First export the files out of hdfs and try it on the shell, as we did before
* Hint 2: Check your result with the SQL query of the same purpose 

* Note2: q -tT is a shorthand for accepting and emitting tab separated values
* Note3: The key value pairs emitted at the map stage should be tab delimited. However hadoop-streaming is problematic with quoting and escaping issues on the command line. To overcome this issue, wrap you mapper into a script as such (so it works as in our R example above). You can do it only for the mapper or also for the reducer and combiner:

First create a directory:

In [None]:
mkdir -p ~/scripts

Write your script as such (easiest way can be tr or sed to replace "," with tabs for the mapper):

In [None]:
cat > ~/scripts/mapper.sh < ~/scripts/combiner.sh < ~/scripts/reducer.sh < /dev/null)
echo "$solution"; echo
for l in "$solution"; do eval "${l}"; done