{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction to Hadoop Ecosystem\n", "\n", "**by Serhat Çevikel**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start SSH" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let's get ssh server service up, since hadoop components communicate over ssh protocol:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sudo service ssh stop\n", "sudo service ssh start" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And check that sshd works:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "service ssh status" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configuration" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check some environment variables:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "echo $HADOOP_HOME\n", "echo $HADOOP_CONF_DIR\n", "echo $HADOOP_PREFIX\n", "echo $PATH" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's view important configuration files. This is a very simple configuration for standalone mode:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls $HADOOP_CONF_DIR" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat $HADOOP_CONF_DIR/core-site.xml" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat $HADOOP_CONF_DIR/hdfs-site.xml" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat $HADOOP_CONF_DIR/mapred-site.xml" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat $HADOOP_CONF_DIR/yarn-site.xml" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat $HADOOP_CONF_DIR/slaves" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before first use, the hdfs must be formatted. We do not do it now, since data is already imported into hdfs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#yes Y | hdfs namenode -format" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## HDFS" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start and stop HDFS:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start the hdfs services:\n", "\n", "The\n", "```Bash\n", "2>&1 | grep -Pv \"^WARNING\"\n", "```\n", "part is there for suppressing annoying WARNING messages in the standard error" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start-dfs.sh 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check which hadoop services run:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "jps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the status of hdfs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfsadmin -report 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If the report shows that \"Safe Mode is ON\", run the command:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfsadmin -safemode leave 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And check again:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfsadmin -report 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To stop the services you will use these commands:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# stop-dfs.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The logs exist at:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls /opt/hadoop-2.9.2/logs/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## HDFS operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let's create a test file and import into hdfs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "echo \"this a test file\" > deneme" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat deneme" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -put ~/deneme / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And check the file system:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Read the contents of the file in the hdfs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /deneme 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a copy of the file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cp /deneme /deneme2 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check that it is created:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a directory named somedir in hdfs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -mkdir /somedir 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check that it is created:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Move deneme2 into somedir:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -mv /deneme2 /somedir 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the contents of somedir:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls /somedir 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now export somedir from hdfs to local file system:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -get /somedir ~/ 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check whether it exists in the local file system:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls ~" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls ~/somedir" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the disk usage of files and directories:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -du / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What exists under /data?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls /data 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We know the imdb, comtrade_s1 and he_sisli datasets from previous sessions\n", "\n", "- ncdc is a part of a huge dataset on detailed meteorological data for USA beginning from 1901\n", "- ngrams is a dataset of the words that appeared in books at books.google.com\n", "\n", "We will try to make use of all this data in this and the following session" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now delete the directory /somedir in hdfs. Note that we should pass the recursive (-r) option just as we should do in the local file system:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -rm -r /somedir 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And check the hdfs for errors:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs fsck / 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can view the report at:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "curl \"http://localhost:50070/fsck?ugi=hadoop&path=%2F\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To access a remote or local system though the web UI, you can point your browser to the above link (not in binder)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "More commands are listed at:\n", "\n", "https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html\n", "\n", "https://hadoop.apache.org/docs/r2.9.2/hadoop-project-dist/hadoop-common/FileSystemShell.html\n", "\n", "https://data-flair.training/blogs/top-hadoop-hdfs-commands-tutorial/\n", "\n", "https://www.edureka.co/blog/hdfs-commands-hadoop-shell-command" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```Bash\n", "Usage: hadoop fs [generic options]\n", "\t[-appendToFile ... ]\n", "\t[-cat [-ignoreCrc] ...]\n", "\t[-checksum ...]\n", "\t[-chgrp [-R] GROUP PATH...]\n", "\t[-chmod [-R] PATH...]\n", "\t[-chown [-R] [OWNER][:[GROUP]] PATH...]\n", "\t[-copyFromLocal [-f] [-p] [-l] [-d] ... ]\n", "\t[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] ... ]\n", "\t[-count [-q] [-h] [-v] [-t []] [-u] [-x] ...]\n", "\t[-cp [-f] [-p | -p[topax]] [-d] ... ]\n", "\t[-createSnapshot []]\n", "\t[-deleteSnapshot ]\n", "\t[-df [-h] [ ...]]\n", "\t[-du [-s] [-h] [-x] ...]\n", "\t[-expunge]\n", "\t[-find ... ...]\n", "\t[-get [-f] [-p] [-ignoreCrc] [-crc] ... ]\n", "\t[-getfacl [-R] ]\n", "\t[-getfattr [-R] {-n name | -d} [-e en] ]\n", "\t[-getmerge [-nl] [-skip-empty-file] ]\n", "\t[-help [cmd ...]]\n", "\t[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [ ...]]\n", "\t[-mkdir [-p] ...]\n", "\t[-moveFromLocal ... ]\n", "\t[-moveToLocal ]\n", "\t[-mv ... ]\n", "\t[-put [-f] [-p] [-l] [-d] ... ]\n", "\t[-renameSnapshot ]\n", "\t[-rm [-f] [-r|-R] [-skipTrash] [-safely] ...]\n", "\t[-rmdir [--ignore-fail-on-non-empty] ...]\n", "\t[-setfacl [-R] [{-b|-k} {-m|-x } ]|[--set ]]\n", "\t[-setfattr {-n name [-v value] | -x name} ]\n", "\t[-setrep [-R] [-w] ...]\n", "\t[-stat [format] ...]\n", "\t[-tail [-f] ]\n", "\t[-test -[defsz] ]\n", "\t[-text [-ignoreCrc] ...]\n", "\t[-touchz ...]\n", "\t[-truncate [-w] ...]\n", "\t[-usage [cmd ...]]\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE 1:**\n", "\n", "Use above commands at least once for your own example\n", "- On the local filesystem, create a test2 file that holds number 1 to 100 using seq\n", "- Create a directory named testdir on hdfs\n", "- Import test2 file into hdfs under testdir directory\n", "- Change the name of test1 directory to testdir2 using mv on hdfs\n", "- Copy the test2 file to the root of hdfs\n", "- On the local filesystem, create a test3 file that holds number 101 to 200 using seq\n", "- Append test3 into test2 in hdfs file using -appendToFile command\n", "- Cat the test2 file under hdfs\n", "- Find the files and directories with names including \"test\" phrase on hdfs\n", "- List the files under the root of hdfs\n", "- Find the disk usage of all directories under /data on hdfs (report as human friendly numbers=\n", "- Export test2 from hdfs into local file system (use force option to overwrite the existing test2)\n", "\n", "Note that most of common flags to shell commands also apply to hdfs equivalents" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pass1=\n", "encrypt=\"U2FsdGVkX190J7s7LVS6p2cqrbvdJpxJNOGHMATezfYAhbeuCpP2EZPDpxVFyR5k 9DCRgkB/keBMrs0b8Jk6TR0nyKJTJO1wzxTab3L/rmJ2biVxIPamOnW9cN8o4aUD o/oi3CyQQKor5Ubt7AOzJQ7d8fAxc3dNe7iNq6btxRAj5ENjL5KwqM5pBXuLc9O9 qaQqmoDowdClkObnWe5BlvPSB7tdQWYPG31aTjQw0GkUXuHaVE/gPTLCUTNornw7 OYpQxWiIyD/+xHu/tDRprapWOCc4uC/PetLHiz3BRYI+riQQ5+mjrZObfByzW5w0 dQ/lljz/SZEiqppHTBLIXm9ewGJUGetIs+wPtw4S1KfUen8GlNUZZ13dAc3fMZIY P+EeVtGeZaLPAAoa2a6vHv8mAUOwMmIwVfcbRtf2E5ngl6mJWp/Z02ElVunZP/zF GdpkXdp5NjiNBH2vE+7e+n7PynHnFX66W80Vs6IM/+r81qcKQUfwWgl2ti+ASLoy z2pNqqq6jc4SQUA8glImJX55r2sBxdCisn9fUDeOliVCgxCuxjNLTN5O6VA/KebD 9RUxUwDKX87NQiL3g7GNjjgXQ1O2yAusEULCCOZhNkoVnvwgVv1Z0ySY58auft8J oqov/7SX0igWtDHyo8yYdzzZIyYqoQK4LsI9yJJasLLNS/v44UgZCRSuyMBNGfkw piBBhATl9Q3MiWacKmpUE2VlZf432iqUTIkza5SLQ0zhQA3kNpwh7kNCdBu0krfc O3IFpH0m5sNQczdlrO/+AQylw9ht0oTP2oZEFONv3DI=\"\n", "solution=$(echo $encrypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)\n", "echo \"$solution\"; echo\n", "for l in \"$solution\"; do eval \"${l}\"; done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## YARN" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start yarn services:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "start-yarn.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check whether resource manager works by:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "jps" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can stop the service by:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#stop-yarn.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Map Reduce Job: Word Count on NCDC Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will go through two examples using the 1932-1936 years of the ncdc weather data set:\n", "\n", "- Word Count\n", "- Max Temperature" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This example is the \"hello world\" of map reduce and also cited inside the official documentation\n", "\n", "We will first implement the map reduce job as a unix command with pipes\n", "\n", "If this works right, we will run it using hadoop streaming\n", "\n", "Two versions will be run:\n", "- One with a mapper and reducer,\n", "- And the other with a mapper, combiner and reducer\n", "\n", "In larger datasets run on many nodes, it is good practice to use a combiner so that the network traffic between the mapper and reducer is minimized\n", "\n", "Note that all codes/commands must accept data from stdin and emit the result to stdout\n", "It is best to delimit fields by tab" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The mapper phase creates a key value pair out of the data. In the wc example, the original files are send to stdout \n", "\n", "\"head\" command exists just in order to limit the visible output and will not be a part of the mapper" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat ~/data/ncdc/19{01..03} | head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, the reducer will count the lines" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat ~/data/ncdc/19{01..03} | wc -l" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will convert this into a simple map reduce job:\n", "\n", "- Note that the output folder should not exist, so flush it before running the job\n", "- If the command takes parameters, wrap it around single or double quotes\n", "- If the command includes pipes, do it like:\n", "- `bash -c \"your command | second command\"`\n", "\n", "- Note that input and output paths are inside the hdfs\n", "- Mapper and reducer paths are inside the main filesystem\n", "- Run these commands inside the docker shell:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let's create a directory for outputs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -mkdir -p /output 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a directory for outputs from jobs with ncdc dataset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -mkdir -p /output/ncdc 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before starting a new job, always make sure the output directory is empty otherwise an error is returned:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -rm -r -f /output/ncdc/* 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The path to hadoop-streaming jar file is:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's get the paths to cat and wc:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "which cat\n", "which wc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output directory should be non-existent before the command:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \\\n", "-input /data/ncdc/19{01..03} \\\n", "-output /output/ncdc/1 \\\n", "-mapper /bin/cat \\\n", "-reducer '/usr/bin/wc -l' \\\n", "2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check the result:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /output/ncdc/1/* 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is in line with what he got from the local run" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will rewrite the job adding a combiner:\n", "\n", "The combiner will take over the job of reducer: For each task, the word count is calculated\n", "Now the reducer will just add the word counts!\n", "\n", "From the shell, the \"bc\" command will do it for us.\n", "\n", "First let's see what happens until the reducer:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# for file in /data/ncdc/txt/*; \\ # each file is send to cat separately\n", " # and we see what mapper, combiner and reducer does\n", "# do cat $file | \\ # mapper\n", "# wc -l; done # combiner\n", "\n", "for file in ~/data/ncdc/19{01..03}; do cat $file | wc -l; done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, add the reducer:\n", "- \"paste -sd+\" puts a \"+\" sign between the numbers\n", "- bc will calculate this formula, and add the numbers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for file in ~/data/ncdc/19{01..03}; do cat $file | wc -l; done | paste -sd+ | bc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's convert it to a mapreduce job:\n", "\n", "Note that we have a new output directory:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \\\n", "-input /data/ncdc/19{01..03} \\\n", "-output /output/ncdc/2 \\\n", "-mapper /bin/cat \\\n", "-combiner '/usr/bin/wc -l' \\\n", "-reducer \"bash -c 'paste -sd+ | bc'\" \\\n", "2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's see the result:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /output/ncdc/2/* 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Title Count Across Years Using IMDB Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will revisit the old friend 'imdb' dataset:\n", "\n", "We will first split the title.basics file into equal parts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tldr split" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's split the first 30k rows of title.basics.tsv into 3 parts:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mkdir -p ~/data/split" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cd ~/data/split && \\\n", "head -30000 ~/data/imdb/tsv2/title.basics.tsv | tail -n+2 | split -l 10000" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ls ~/data/split" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -put -f ~/data/split /data 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "hdfs dfs -ls /data 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE 2:**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You job is to \n", "- Get the start year column of the files as the mapper\n", "- Get the count of each year as the combiner\n", "- Aggregate the count of years in each task as the reducer\n", "\n", "You can use the \"cut\" command to get the necessary column. Note that default field delimiter for cut is \"\\t\", the same as the files\n", "\n", "You can view the initial rows of the file with head, so that you decide on which column to extract\n", "\n", "For combiner and reducer, you can use a small but very talented tool called \"q\" inside sandbox. \"q\" uses sqlite as its backend, however, it does not need a database: It can work on columnar data fed from stdin. Usual sql statements just work!\n", "\n", "Only you should write \"-\" for the \"from\" clause and fields are named as c1, c2, etc. You can use select, from, group by and aggeragete functions such as count() or sum().\n", "\n", "Note that when feeding into combiner or reducer you should wrap the line inside quotes. But \n", "the statement itself needs quotes. So one of the quote pair should be single and the other should be double as such: -combiner 'q \"select ........\"'\n", "\n", "The output path you provide must be non-existent. So either provide a new one or flush the existing one." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First try the mapper, combiner and reducer on the command line:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```Bash\n", "# only mapper\n", "for file in ~/data/split/*; do your_mapper | head -3; echo; done\n", "\n", "1894\n", "1892\n", "1892\n", "\n", "1919\n", "1919\n", "1919\n", "\n", "1929\n", "1929\n", "1930\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then we add the combiner (\"head\" and \"column\" are there just for visual purposes):\n", "**Note that in q, when there is no header row, columns are referred as c1, c2, and the standart input is \"-\" in the FROM clause as such:\n", "```SQL\n", "SELECT c10\n", "FROM -\n", "```\n", "\n", "```Bash\n", "# mapper + combiner\n", "for file in ~/data/split/*; do your_mapper | \\\n", "q \"your_combiner\" | \\\n", "head -3; done\n", "\n", "1892 3\n", "1893 1\n", "1894 6\n", "\n", "1912 1\n", "1915 4\n", "1916 5\n", "\n", "1919 1\n", "1926 1\n", "1927 1`\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And last, we add our reducer:\n", "\n", "```Bash\n", "# mapper + combiner + reducer\n", "for file in ~/data/split/*; do your_mapper | \\\n", "q \"your_combiner\"; done | \\\n", "q \"your_reducer\" | \\\n", "pr --columns=3\n", "\n", "1892 3\t\t\t1911 506\t\t1930 937\n", "1893 1\t\t\t1912 601\t\t1931 1023\n", "1894 6\t\t\t1913 980\t\t1932 1072\n", "1895 18\t\t\t1914 1225\t\t1933 1062\n", "1896 105\t\t1915 1475\t\t1934 1176\n", "1897 38\t\t\t1916 1235\t\t1935 1182\n", "1898 44\t\t\t1917 1211\t\t1936 1261\n", "1899 46\t\t\t1918 1038\t\t1937 1238\n", "1900 82\t\t\t1919 1076\t\t1938 675\n", "1901 35\t\t\t1920 941\t\t1939 55\n", "1902 35\t\t\t1921 953\t\t1940 13\n", "1903 57\t\t\t1922 892\t\t1942 4\n", "1904 21\t\t\t1923 831\t\t1944 1\n", "1905 33\t\t\t1924 888\t\t1945 1\n", "1906 41\t\t\t1925 1021\t\t1946 1\n", "1907 49\t\t\t1926 997\t\t1949 1\n", "1908 158\t\t1927 1009\t\t1950 1\n", "1909 307\t\t1928 1007\t\t1951 2\n", "1910 363\t\t1929 965\t\t1993 1\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we are ready to run it as a mapreduce job on hdfs and yarn, before that we flush the output directory (or pass a non-existent one)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -rm -r -f /output/split \\\n", "2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And run you mapreduce job:\n", "\n", "```Bash\n", "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \\\n", "-input /data/split \\\n", "-output /output/split \\\n", "-mapper \"your_mapper\" \\\n", "-combiner 'q \"your_combiner\"' \\\n", "-reducer 'q \"your_reducer\"' \\\n", "2>&1 | grep -Pv \"^WARNING\"\n", "\n", "1892 3\t\t\t1911 506\t\t1930 937\t\n", "1893 1\t\t\t1912 601\t\t1931 1023\t\n", "1894 6\t\t\t1913 980\t\t1932 1072\t\n", "1895 18\t\t\t1914 1225\t\t1933 1062\t\n", "1896 105\t\t1915 1475\t\t1934 1176\t\n", "1897 38\t\t\t1916 1235\t\t1935 1182\t\n", "1898 44\t\t\t1917 1211\t\t1936 1261\t\n", "1899 46\t\t\t1918 1038\t\t1937 1238\t\n", "1900 82\t\t\t1919 1076\t\t1938 675\t\n", "1901 35\t\t\t1920 941\t\t1939 55\t\n", "1902 35\t\t\t1921 953\t\t1940 13\t\n", "1903 57\t\t\t1922 892\t\t1942 4\t\n", "1904 21\t\t\t1923 831\t\t1944 1\t\n", "1905 33\t\t\t1924 888\t\t1945 1\t\n", "1906 41\t\t\t1925 1021\t\t1946 1\t\n", "1907 49\t\t\t1926 997\t\t1949 1\t\n", "1908 158\t\t1927 1009\t\t1950 1\t\n", "1909 307\t\t1928 1007\t\t1951 2\t\n", "1910 363\t\t1929 965\t\t1993 1\t\n", "```\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**SOLUTION 2:**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pass1=\n", "#encrypt=\"U2FsdGVkX1/3ITjk9bC8+/uY393hUM9NM+lLH5yhYtatuHSSPiyLLXtpQEv0r7dV 5mEYMVLspvDT7nD6aQOz9KFsGra/d34BKOMZxg3rTga5/sNlsuxYAWQ/c02s/9wE qYAq+VYVeLXfvDF/0Ihjr/3DEpJywvl/UhN+9idf/df4zvogW0UAwg/ruvUPuMq3 ixlLMN0b1NhE0mrv+OE/iveCtaf87nhWEoz9i0JlD4jp1vA40pM0ro22z9bBUcWk u/ythMTpzOb0Cumci/lWZ0jiCElECtBVB5jLC2P6WOtQxHX8qRQYWs2GPigGZIFP ZdL6Twwq5sXnsZDKF09NOHgJMiAv/RK9DGsRaNOjrYPs+UNXiGLWe0n+LkYoiP3+ kQrBIQJXKk2cRp/a76VCu//EWaj+bW0gjcfkrVQUyi8J3PUMCLb9JYgAeQHzyAoe zNTHne9dfDd8sj01MDYaTzzfa7evSNnSxvH00on8k1ksoYZcP8IRlNF6rx4ZANEH AzfyZRme8aV3GLJw1uAANVt0/SYnbdLQqy4Z4oYfnmIR+LUmXYqaocKuIeORBhqT bBl1UCj5hjiIZoRGTraTo6kuUfptah68fhm7+WXppbtGhGrlkqx+WWfzbGJP2+Fb VFq7yV8wXyc/mJfSFwzviyowKx1JkahVpHX3NKgPiDPPsAjiIGq+WK/jsnx75cFG ilLM1lNDM21LMviVoUq+EjUvVbmMSa8/knE9VXePQk07xJhk3yv6uuktzFIfkKrH 4N0fX0hEmcC6BDYfX1XcwOqOk5bUCGYkNNpU2fyHSaqBK4bSNhg04K9fnGYribVg EyFlpOB/bAM3BFrQL/vyW8S35EBAs5NLMFKTT9CHqSJ+TPZPahj6gwtmymiq1gz1 yLQsPUobBppWKnFvasneR8lfrqfRk3rVEvgpGu7+9Tz270VUoO+BKpm/FoVQqvm2 Qxwflk23pAvEeIJLGcyj68NJYktAQhi4BWeiAkpATGdMem3YPI++jusgM4oJumKA KO/wAEUuN3c6lgWCZ4Kbm1fqW3LLm2X8av5D5Ynsh3XgXglu7TbWtvu6Q7TC5M75 TNf+WTt/MG03P0Spqgty8hbCRb60IJhZJYJ+2+O9MNBScfSeDIqPF+s7lU0msI5G whUiXoY8b5Zuyo5aqiqtS/dRdCXwuio5DskBebLlB7YDTdAtmCtzvVNCYQUg5YCG 9VRUzj1gc9N75S0gFPv8fg==\"\n", "solution=$(cat ~/crypt/06_mr1.crypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)\n", "echo \"$solution\"; echo\n", "for l in \"$solution\"; do eval \"${l}\"; done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating Scripts for MapReduce Job: Maximum Temperature Example on NCDC Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, using the same NCDC data set we'll recreate the maximum temperature example from the Elephant Book as a \"Hadoop streaming\" job:\n", "\n", "Your computers have a century of the temperature data of USA. We will use just the first 10 years of this data\n", "\n", "And for each year we will get the max temperature\n", "\n", "We will first start with standard unix tools\n", "\n", "I played a bit with the original script given as a part of the supplementary material for the Elephant Book\n", "\n", "Note that there may be empty files and the job must have a remedy for this issue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The script is as follows:\n", "\n", "```Bash\n", "#!/usr/bin/env bash\n", "\n", "# adjusted by Serhat Cevikel to measure the time\n", "\n", "#START=$(date +%s.%N)\n", "path=$1\n", "starty=$2\n", "endy=$3\n", "\n", "years=$(seq $starty $endy)\n", "\n", "\n", "for year in $years\n", "do\n", " filee=\"${path}/${year}\"\n", " echo -ne `basename $year `\"\\t\"\n", " cat $filee | \\ \n", " awk '{ temp = substr($0, 88, 5) + 0;\n", " q = substr($0, 93, 1);\n", " if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }\n", " END { print max }'\n", "done\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Missing temp values are coded as 9999 and they are excluded\n", "\"q\" is a quality code and should be one of 0,1,4,5,9 to be included" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The path to script is ~/codes/hadoop_max_temperature.sh\n", "\n", "In my implementation it takes 3 parameters: the path to gz files, start year and end year\n", "\n", "The data resides at ~/data/ncdc\n", "\n", "Let's do it for 1901 to 1903" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "~/codes/hadoop_max_temperature.sh \\\n", "~/data/ncdc \\\n", "1901 1903" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's transform it to a map reduce job:\n", "\n", "In the map phase, the key value pairs are extracted from the data: the year and temp reading\n", "\n", "In the reduce phase the max temp for each year is calculated\n", "\n", "mapper is:\n", "\n", "```R\n", "#!/usr/bin/Rscript\n", "\n", "con <- file(\"stdin\")\n", "#con <- file(\"1910\")\n", "liness <- readLines(con)\n", "close(con)\n", "\n", "year <- as.numeric(substr(liness, 16, 19))\n", "temp <- as.numeric(substr(liness, 88, 92))\n", "qq <- as.numeric(substr(liness, 93, 93))\n", "\n", "output <- cbind(year, temp)\n", "\n", "output <- output[temp != 9999 & qq %in% c(0, 1, 4, 5, 9),]\n", "\n", "for (i in seq_along(output[,1]))\n", "{\n", " pasted <- paste(output[i,], collapse = \"\\t\")\n", " cat(sprintf(\"%s\\n\", pasted))\n", "}\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The code accepts input from stdin so can be used similar to the previous one - before Hadoop" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat ~/data/ncdc/{1901..1903} | ~/codes/mapper.R | head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The reducer code is as follows:\n", "\n", "```R\n", "#!/usr/bin/Rscript\n", "\n", "con <- file(\"stdin\")\n", "#com <- file(\"mapped\")\n", "liness <- readLines(con)\n", "close(con)\n", "\n", "keyval <- list()\n", "\n", "for (i in seq_along(liness))\n", "{\n", " linex <- unlist(strsplit(liness[i], split = \"\\t\"))\n", " key <- linex[1]\n", " val <- as.numeric(linex[2])\n", "\n", " cur.maxval <- keyval[[key]]\n", "\n", " if (is.null(cur.maxval))\n", " { \n", " keyval[[key]] <- val \n", " }\n", " else\n", " { \n", " if (val > cur.maxval)\n", " {\n", " keyval[[key]] <- val \n", " }\n", " }\n", "}\n", "\n", "keys <- as.numeric(names(keyval))\n", "vals <- as.numeric(unlist(keyval))\n", "\n", "output <- matrix(c(keys, vals), ncol = 2)\n", "output <- output[order(keys),, drop = F]\n", "\n", "for (i in seq_along(output[,1]))\n", "{\n", " pasted <- paste(output[i,], collapse = \"\\t\")\n", " cat(sprintf(\"%s\\n\", pasted))\n", "}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat ~/data/ncdc/19{01..03} | ~/codes/mapper.R | ~/codes/reducer.R" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can run the map reduce job. Note that we have to pass the custom script files via \"-file\" option so that all nodes can run it:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \\\n", "-input /data/ncdc/19{01..03} \\\n", "-output /output/ncdc/3 \\\n", "-mapper ~/codes/mapper.R \\\n", "-reducer ~/codes/reducer.R \\\n", "-file ~/codes/mapper.R \\\n", "-file ~/codes/reducer.R \\\n", "2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the output:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /output/ncdc/3/* 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Or we can run it with a combiner (the same script as the reducer for this case):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.9.2.jar \\\n", "-input /data/ncdc/19{01..03} \\\n", "-output /output/ncdc/4 \\\n", "-mapper ~/codes/mapper.R \\\n", "-combiner ~/codes/reducer.R \\\n", "-reducer ~/codes/reducer.R \\\n", "-file ~/codes/mapper.R \\\n", "-file ~/codes/reducer.R \\\n", "2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /output/ncdc/4/* 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SQOOP" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will use sqoop in order to import data\n", "- from RDBMS\n", "- into HDFS as text files\n", "- or as hive tables" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let's start postgresql service:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sudo service postgresql start" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check whether it runs:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "psql -U postgres -c \"\\l\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And let's remember the table names and table fields in imdb2: (You can also use the imdb_database file at home directory)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "psql -U postgres -d imdb2 -c \"\\dt+\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "psql -U postgres -d imdb2 -c \"\\d+ public.*\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And let's see whether sqoop can connect to the database:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sqoop list-tables --connect jdbc:postgresql://localhost:5432/imdb2 \\\n", "--username postgres 2>&1 | grep -Pv \"^(Warning|Please|WARNING)\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, first import a single table as a whole into hdfs\n", "\n", "The target directory should be non-existent. The direct flag is for fast imports:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sqoop import --connect jdbc:postgresql://localhost:5432/imdb2 \\\n", "--username postgres \\\n", "--table name_basics \\\n", "--split-by birthyear \\\n", "--target-dir /import1 \\\n", "--direct \\\n", "2>&1 | grep -Pv \"^(Warning|Please|WARNING)\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls /import1 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And view the file:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -cat /import1/* 2>&1 | grep -Pv \"^WARNING\" | head" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's run a query on the database and import the results:\n", "Note that now we split the output to 4 parts using the mapper (-m) flag:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sqoop import --connect jdbc:postgresql://localhost:5432/imdb2 \\\n", "--username postgres \\\n", "--query \"SELECT * FROM name_basics WHERE birthyear > 1990 \\\n", "AND birthyear IS NOT NULL AND \\$CONDITIONS\" \\\n", "--split-by birthyear \\\n", "-m 4 \\\n", "--target-dir /import2 \\\n", "--direct \\\n", "2>&1 | grep -Pv \"^(Warning|Please|WARNING)\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's check whether the new directory and file(s) exist:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -ls /import2 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In 2.9.2 version there is no -head command in hdfs (however it is added in 3.1.0)\n", "We can view the tails however:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "seq 0 3 | xargs -i hdfs dfs -tail /import2/part-m-0000{} 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Integrated Sqoop & MR exercise" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Phase 1: Sqoop" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE 3:**\n", "\n", "From title_basics select all movies that was shot between 1950 and 1953, runtimeminues is between 60 and 100 and includes comedy in genres and averagerating is not null\n", "\n", "Return startyear from title_basics and join with title_ratings (using tconst) to return averagerating. Import into /import3 directory at hdfs by sqoop using 4 mappers, splitting over averagerating values. Do not return tconst/\n", "\n", "**Hint:** First run the query with psql and save into a variable (so as not to overstuff the browser windows with a long output). And check the head and line count of the variable contents. If it works then convert it to a sqoop job!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First flush the /import3 directory" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -rm -r /import3 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The files should be as such:\n", "\n", "```Bash\n", "for i in {0..3};\n", "do hdfs dfs -cat /import3/part-m-0000${i} 2>&1 | grep -Pv \"^WARNING\" | \\\n", "pee 'head -1' 'wc -l' | tr \"\\n\" \"\\t\" | xargs -i echo -e \"{}\";\n", "done\n", "\n", "1950,3.8\t21\t\n", "1951,4.5\t264\t\n", "1952,6.1\t550\t\n", "1953,8.5\t34\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**SOLUTION 3:**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pass1=\n", "solution=$(cat ~/crypt/06_sqoop1.crypt | openssl enc -md sha256 -aes-128-cbc -a -d -salt -pass pass:$pass1 2> /dev/null)\n", "echo \"$solution\"; echo\n", "for l in \"$solution\"; do eval \"${l}\"; done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Phase 2: MapReduce" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now first delete the _SUCCESS file under /import3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hdfs dfs -rm /import3/_SUCCESS 2>&1 | grep -Pv \"^WARNING\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And check the first lines and total lines:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in {0..3};\n", "do hdfs dfs -cat /import3/part-m-0000${i} 2>&1 | grep -Pv \"^WARNING\" | \\\n", "pee 'head -1' 'wc -l' | tr \"\\n\" \"\\t\" | xargs -i echo -e \"{}\";\n", "done" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**EXERCISE 4:**\n", "\n", "Write a map reduce job with a mapper, combiner and reducer to return the average ratings for each year. You can recycle our previous example with cut and q\n", "\n", "* Hint 1: First export the files out of hdfs and try it on the shell, as we did before\n", "* Hint 2: Check your result with the SQL query of the same purpose \n", "\n", "* Note2: q -tT is a shorthand for accepting and emitting tab separated values\n", "* Note3: The key value pairs emitted at the map stage should be tab delimited. However hadoop-streaming is problematic with quoting and escaping issues on the command line. To overcome this issue, wrap you mapper into a script as such (so it works as in our R example above). You can do it only for the mapper or also for the reducer and combiner:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First create a directory:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mkdir -p ~/scripts" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Write your script as such (easiest way can be tr or sed to replace \",\" with tabs for the mapper):" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cat > ~/scripts/mapper.sh < ~/scripts/combiner.sh < ~/scripts/reducer.sh < /dev/null)\n", "echo \"$solution\"; echo\n", "for l in \"$solution\"; do eval \"${l}\"; done" ] } ], "metadata": { "kernelspec": { "display_name": "Bash", "language": "bash", "name": "bash" }, "language_info": { "codemirror_mode": "shell", "file_extension": ".sh", "mimetype": "text/x-sh", "name": "bash" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": true }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 4 }