{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"id": "view-in-github"
},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "GzbmlR27wh6e"
},
"source": [
"\n",
"\n",
"# MapReduce: A Primer with Hello World!
\n",
"
\n",
"
\n",
"\n",
"For this tutorial, we are going to download the core Hadoop distribution and run Hadoop in _local standalone mode_:\n",
"\n",
"> ❝ _By default, Hadoop is configured to run in a non-distributed mode, as a single Java process._ ❞\n",
"\n",
"(see [https://hadoop.apache.org/docs/stable/.../Standalone_Operation](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation))\n",
"\n",
"We are going to run a MapReduce job using MapReduce's [streaming application](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Hadoop_Streaming). This is not to be confused with real-time streaming:\n",
"\n",
"> ❝ _Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer._ ❞\n",
"\n",
"MapReduce streaming defaults to using [`IdentityMapper`](https://hadoop.apache.org/docs/stable/api/index.html) and [`IdentityReducer`](https://hadoop.apache.org/docs/stable/api/index.html), thus eliminating the need for explicit specification of a mapper or reducer. Finally, we show how to run a map-only job by setting `mapreduce.job.reduce` equal to $0$.\n",
"\n",
"Both input and output are standard files since Hadoop's default filesystem is the regular file system, as specified by the `fs.defaultFS` property in [core-default.xml](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/core-default.xml)).\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uUbM5R0GwwYw"
},
"source": [
"# Download core Hadoop"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:45:53.005612Z",
"iopub.status.busy": "2024-03-11T15:45:53.005405Z",
"iopub.status.idle": "2024-03-11T15:45:56.608648Z",
"shell.execute_reply": "2024-03-11T15:45:56.607923Z"
},
"id": "jDgQtQlzw8bL",
"outputId": "829df74f-efd1-4484-a374-44fcf2c95b2f"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Not downloading, Hadoop folder hadoop-3.3.6 already exists\n"
]
}
],
"source": [
"HADOOP_URL = \"https://dlcdn.apache.org/hadoop/common/stable/hadoop-3.3.6.tar.gz\"\n",
"\n",
"import requests\n",
"import os\n",
"import tarfile\n",
"\n",
"def download_and_extract_targz(url):\n",
" response = requests.get(url)\n",
" filename = url.rsplit('/', 1)[-1]\n",
" HADOOP_HOME = filename[:-7]\n",
" # set HADOOP_HOME environment variable\n",
" os.environ['HADOOP_HOME'] = HADOOP_HOME\n",
" if os.path.isdir(HADOOP_HOME):\n",
" print(\"Not downloading, Hadoop folder {} already exists\".format(HADOOP_HOME))\n",
" return\n",
" if response.status_code == 200:\n",
" with open(filename, 'wb') as file:\n",
" file.write(response.content)\n",
" with tarfile.open(filename, 'r:gz') as tar_ref:\n",
" extract_path = tar_ref.extractall(path='.')\n",
" # Get the names of all members (files and directories) in the archive\n",
" all_members = tar_ref.getnames()\n",
" # If there is a top-level directory, get its name\n",
" if all_members:\n",
" top_level_directory = all_members[0]\n",
" print(f\"ZIP file downloaded and extracted successfully. Contents saved at: {top_level_directory}\")\n",
" else:\n",
" print(f\"Failed to download ZIP file. Status code: {response.status_code}\")\n",
"\n",
"\n",
"download_and_extract_targz(HADOOP_URL)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "3yvb5cw9xEbh"
},
"source": [
"# Set environment variables"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "u6lkrz1dxIiO"
},
"source": [
"## Set `HADOOP_HOME` and `PATH`"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:45:56.640406Z",
"iopub.status.busy": "2024-03-11T15:45:56.639824Z",
"iopub.status.idle": "2024-03-11T15:45:56.644219Z",
"shell.execute_reply": "2024-03-11T15:45:56.643590Z"
},
"id": "s7maAwaFxBT_",
"outputId": "cdcb2d0e-e387-452f-d601-c3828bffe4ab"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"HADOOP_HOME is hadoop-3.3.6\n",
"PATH is hadoop-3.3.6/bin:/opt/hostedtoolcache/Python/3.8.18/x64/bin:/opt/hostedtoolcache/Python/3.8.18/x64:/snap/bin:/home/runner/.local/bin:/opt/pipx_bin:/home/runner/.cargo/bin:/home/runner/.config/composer/vendor/bin:/usr/local/.ghcup/bin:/home/runner/.dotnet/tools:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games:/usr/local/games:/snap/bin\n"
]
}
],
"source": [
"# HADOOP_HOME was set earlier when downloading Hadoop distribution\n",
"print(\"HADOOP_HOME is {}\".format(os.environ['HADOOP_HOME']))\n",
"\n",
"os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']])\n",
"print(\"PATH is {}\".format(os.environ['PATH']))"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4kzJ8cNoxPyK"
},
"source": [
"## Set `JAVA_HOME`\n",
"\n",
"While Java is readily available on Google Colab, we consider the broader scenario of an Ubuntu machine. In this case, we ensure compatibility by installing Java, specifically opting for the `openjdk-19-jre-headless` version."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:45:56.646756Z",
"iopub.status.busy": "2024-03-11T15:45:56.646422Z",
"iopub.status.idle": "2024-03-11T15:45:56.653209Z",
"shell.execute_reply": "2024-03-11T15:45:56.652596Z"
},
"id": "SauFHVPOxL-Y",
"outputId": "51c06892-7c62-460c-f16d-c8ffd51f2547"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Java is already installed: /usr/lib/jvm/temurin-11-jdk-amd64\n"
]
}
],
"source": [
"import shutil\n",
"\n",
"# set variable JAVA_HOME (install Java if necessary)\n",
"def is_java_installed():\n",
" os.environ['JAVA_HOME'] = os.path.realpath(shutil.which(\"java\")).split('/bin')[0]\n",
" return os.environ['JAVA_HOME']\n",
"\n",
"def install_java():\n",
" # Uncomment and modify the desired version\n",
" # java_version= 'openjdk-11-jre-headless'\n",
" # java_version= 'default-jre'\n",
" # java_version= 'openjdk-17-jre-headless'\n",
" # java_version= 'openjdk-18-jre-headless'\n",
" java_version= 'openjdk-19-jre-headless'\n",
"\n",
" print(f\"Java not found. Installing {java_version} ... (this might take a while)\")\n",
" try:\n",
" cmd = f\"apt install -y {java_version}\"\n",
" subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)\n",
" stdout_result = subprocess_output.stdout\n",
" # Process the results as needed\n",
" print(\"Done installing Java {}\".format(java_version))\n",
" os.environ['JAVA_HOME'] = os.path.realpath(shutil.which(\"java\")).split('/bin')[0]\n",
" print(\"JAVA_HOME is {}\".format(os.environ['JAVA_HOME']))\n",
" except subprocess.CalledProcessError as e:\n",
" # Handle the error if the command returns a non-zero exit code\n",
" print(\"Command failed with return code {}\".format(e.returncode))\n",
" print(\"stdout: {}\".format(e.stdout))\n",
"\n",
"# Install Java if not available\n",
"if is_java_installed():\n",
" print(\"Java is already installed: {}\".format(os.environ['JAVA_HOME']))\n",
"else:\n",
" print(\"Installing Java\")\n",
" install_java()"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "6HFPVX84xbNd"
},
"source": [
"# Run a MapReduce job with Hadoop streaming"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "_yVa55X1xmOb"
},
"source": [
"## Create a file\n",
"\n",
"Write the string\"Hello, World!\" to a local file.
**Note:** you will be writing to the file `./hello.txt` in your current directory (denoted by `./`)."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"execution": {
"iopub.execute_input": "2024-03-11T15:45:56.655904Z",
"iopub.status.busy": "2024-03-11T15:45:56.655523Z",
"iopub.status.idle": "2024-03-11T15:45:56.790282Z",
"shell.execute_reply": "2024-03-11T15:45:56.789590Z"
},
"id": "9Jz7mJkcxYxw"
},
"outputs": [],
"source": [
"!echo \"Hello, World!\">./hello.txt"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "zSh_Kr5Bxvst"
},
"source": [
"## Launch the MapReduce \"Hello, World!\" application\n",
"\n",
"Since the default filesystem is the local filesystem (as opposed to HDFS) we do not need to upload the local file `hello.txt` to HDFS.\n",
"\n",
"Run a MapReduce job with `/bin/cat` as a mapper and no reducer.\n",
"\n",
"**Note:** the first step of removing the output directory is necessary because MapReduce does not overwrite data folders by design."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:45:56.793334Z",
"iopub.status.busy": "2024-03-11T15:45:56.793117Z",
"iopub.status.idle": "2024-03-11T15:46:00.219770Z",
"shell.execute_reply": "2024-03-11T15:46:00.219013Z"
},
"id": "nb5JryK9xpPA",
"outputId": "7ba38b06-d0ee-4606-974d-63d68835700e"
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"rm: `my_output': No such file or directory\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,688 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,770 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,770 INFO impl.MetricsSystemImpl: JobTracker metrics system started\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,783 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,931 INFO mapred.FileInputFormat: Total input files to process : 1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:58,944 INFO mapreduce.JobSubmitter: number of splits:1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,069 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local370782050_0001\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,070 INFO mapreduce.JobSubmitter: Executing with tokens: []\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,164 INFO mapreduce.Job: The url to track the job: http://localhost:8080/\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,166 INFO mapreduce.Job: Running job: job_local370782050_0001\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,172 INFO mapred.LocalJobRunner: OutputCommitter set in config null\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,175 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,180 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,181 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,212 INFO mapred.LocalJobRunner: Waiting for map tasks\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,217 INFO mapred.LocalJobRunner: Starting task: attempt_local370782050_0001_m_000000_0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,237 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,237 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,253 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,258 INFO mapred.MapTask: Processing split: file:/home/runner/work/big_data/big_data/hello.txt:0+14\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,265 INFO mapred.MapTask: numReduceTasks: 1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,281 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,281 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,281 INFO mapred.MapTask: soft limit at 83886080\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,281 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,281 INFO mapred.MapTask: kvstart = 26214396; length = 6553600\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,284 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,288 INFO streaming.PipeMapRed: PipeMapRed exec [/bin/cat]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,292 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,294 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,294 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,294 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,295 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,295 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,296 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,296 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,296 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,297 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,297 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,303 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,319 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,319 INFO streaming.PipeMapRed: Records R/W=1/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,320 INFO streaming.PipeMapRed: MRErrorThread done\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,321 INFO streaming.PipeMapRed: mapRedFinished\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,323 INFO mapred.LocalJobRunner: \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,323 INFO mapred.MapTask: Starting flush of map output\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,323 INFO mapred.MapTask: Spilling map output\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,323 INFO mapred.MapTask: bufstart = 0; bufend = 15; bufvoid = 104857600\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,323 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,328 INFO mapred.MapTask: Finished spill 0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,338 INFO mapred.Task: Task:attempt_local370782050_0001_m_000000_0 is done. And is in the process of committing\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,340 INFO mapred.LocalJobRunner: Records R/W=1/1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,340 INFO mapred.Task: Task 'attempt_local370782050_0001_m_000000_0' done.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,346 INFO mapred.Task: Final Counters for attempt_local370782050_0001_m_000000_0: Counters: 17\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile System Counters\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes read=141437\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes written=781694\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of large read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of write operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tMap-Reduce Framework\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap input records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output bytes=15\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output materialized bytes=23\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tInput split bytes=102\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCombine input records=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tSpilled Records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFailed Shuffles=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMerged Map outputs=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tGC time elapsed (ms)=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tTotal committed heap usage (bytes)=314572800\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile Input Format Counters \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBytes Read=14\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,347 INFO mapred.LocalJobRunner: Finishing task: attempt_local370782050_0001_m_000000_0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,350 INFO mapred.LocalJobRunner: map task executor complete.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,355 INFO mapred.LocalJobRunner: Waiting for reduce tasks\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,355 INFO mapred.LocalJobRunner: Starting task: attempt_local370782050_0001_r_000000_0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,360 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,360 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,360 INFO mapred.Task: Using ResourceCalculatorProcessTree : [ ]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,363 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@9ffba29\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,368 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,380 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=2933076736, maxSingleShuffleLimit=733269184, mergeThreshold=1935830784, ioSortFactor=10, memToMemMergeOutputsThreshold=10\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,382 INFO reduce.EventFetcher: attempt_local370782050_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,407 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local370782050_0001_m_000000_0 decomp: 19 len: 23 to MEMORY\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,410 INFO reduce.InMemoryMapOutput: Read 19 bytes from map-output for attempt_local370782050_0001_m_000000_0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,412 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 19, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->19\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,415 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,416 INFO mapred.LocalJobRunner: 1 / 1 copied.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,416 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,421 INFO mapred.Merger: Merging 1 sorted segments\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,421 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 3 bytes\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,422 INFO reduce.MergeManagerImpl: Merged 1 segments, 19 bytes to disk to satisfy reduce memory limit\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,435 INFO reduce.MergeManagerImpl: Merging 1 files, 23 bytes from disk\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,436 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,436 INFO mapred.Merger: Merging 1 sorted segments\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,438 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 3 bytes\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,439 INFO mapred.LocalJobRunner: 1 / 1 copied.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,444 INFO mapred.Task: Task:attempt_local370782050_0001_r_000000_0 is done. And is in the process of committing\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,445 INFO mapred.LocalJobRunner: 1 / 1 copied.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,445 INFO mapred.Task: Task attempt_local370782050_0001_r_000000_0 is allowed to commit now\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,447 INFO output.FileOutputCommitter: Saved output of task 'attempt_local370782050_0001_r_000000_0' to file:/home/runner/work/big_data/big_data/my_output\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,447 INFO mapred.LocalJobRunner: reduce > reduce\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,447 INFO mapred.Task: Task 'attempt_local370782050_0001_r_000000_0' done.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,448 INFO mapred.Task: Final Counters for attempt_local370782050_0001_r_000000_0: Counters: 24\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile System Counters\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes read=141515\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes written=781744\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of large read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of write operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tMap-Reduce Framework\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCombine input records=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCombine output records=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce input groups=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce shuffle bytes=23\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce input records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce output records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tSpilled Records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tShuffled Maps =1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFailed Shuffles=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMerged Map outputs=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tGC time elapsed (ms)=12\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tTotal committed heap usage (bytes)=314572800\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tShuffle Errors\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBAD_ID=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCONNECTION=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tIO_ERROR=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_LENGTH=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_MAP=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_REDUCE=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile Output Format Counters \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBytes Written=27\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,448 INFO mapred.LocalJobRunner: Finishing task: attempt_local370782050_0001_r_000000_0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:45:59,448 INFO mapred.LocalJobRunner: reduce task executor complete.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:46:00,171 INFO mapreduce.Job: Job job_local370782050_0001 running in uber mode : false\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:46:00,172 INFO mapreduce.Job: map 100% reduce 100%\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:46:00,173 INFO mapreduce.Job: Job job_local370782050_0001 completed successfully\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:46:00,178 INFO mapreduce.Job: Counters: 30\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile System Counters\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes read=282952\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of bytes written=1563438\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of large read operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFILE: Number of write operations=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tMap-Reduce Framework\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap input records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output bytes=15\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMap output materialized bytes=23\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tInput split bytes=102\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCombine input records=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCombine output records=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce input groups=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce shuffle bytes=23\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce input records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tReduce output records=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tSpilled Records=2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tShuffled Maps =1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tFailed Shuffles=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tMerged Map outputs=1\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tGC time elapsed (ms)=12\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tTotal committed heap usage (bytes)=629145600\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tShuffle Errors\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBAD_ID=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tCONNECTION=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tIO_ERROR=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_LENGTH=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_MAP=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tWRONG_REDUCE=0\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile Input Format Counters \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBytes Read=14\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\tFile Output Format Counters \n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\t\tBytes Written=27\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"2024-03-11 15:46:00,179 INFO streaming.StreamJob: Output directory: my_output\n"
]
}
],
"source": [
"%%bash\n",
"hdfs dfs -rm -r my_output\n",
"\n",
"mapred streaming \\\n",
" -input hello.txt \\\n",
" -output my_output \\\n",
" -mapper '/bin/cat'"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "OB_fX9u5x55y"
},
"source": [
"## Verify the result\n",
"\n",
"If the job executed successfully, an empty file named `_SUCCESS` is expected to be present in the output directory `my_output`.\n",
"\n",
"Verify the success of the MapReduce job by checking for the presence of the `_SUCCESS` file."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:46:00.222553Z",
"iopub.status.busy": "2024-03-11T15:46:00.222179Z",
"iopub.status.idle": "2024-03-11T15:46:01.136014Z",
"shell.execute_reply": "2024-03-11T15:46:01.135356Z"
},
"id": "bnvEvYDfx2g4",
"outputId": "d8c3dd75-aca5-4165-aaeb-f57e4e377293"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Check if MapReduce job was successful\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"_SUCCESS exists!\n"
]
}
],
"source": [
"%%bash\n",
"\n",
"echo \"Check if MapReduce job was successful\"\n",
"hdfs dfs -test -e my_output/_SUCCESS\n",
"if [ $? -eq 0 ]; then\n",
"\techo \"_SUCCESS exists!\"\n",
"fi"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BLMnBh44x_YR"
},
"source": [
"**Note:** `hdfs dfs -ls` is the same as `ls` since the default filesystem is the local filesystem."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:46:01.138836Z",
"iopub.status.busy": "2024-03-11T15:46:01.138564Z",
"iopub.status.idle": "2024-03-11T15:46:02.243021Z",
"shell.execute_reply": "2024-03-11T15:46:02.242350Z"
},
"id": "ufAfmGUvx8jW",
"outputId": "3ab8e401-6151-4241-c3b7-b951c71977c6"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 2 items\r\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"-rw-r--r-- 1 runner docker 0 2024-03-11 15:45 my_output/_SUCCESS\r\n",
"-rw-r--r-- 1 runner docker 15 2024-03-11 15:45 my_output/part-00000\r\n"
]
}
],
"source": [
"!hdfs dfs -ls my_output"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:46:02.246227Z",
"iopub.status.busy": "2024-03-11T15:46:02.245769Z",
"iopub.status.idle": "2024-03-11T15:46:02.382457Z",
"shell.execute_reply": "2024-03-11T15:46:02.381789Z"
},
"id": "ZnKSahPzyCAn",
"outputId": "6e94b1ce-fea5-4e72-bbf7-a1c58b820a14"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"total 4\r\n",
"-rw-r--r-- 1 runner docker 0 Mar 11 15:45 _SUCCESS\r\n",
"-rw-r--r-- 1 runner docker 15 Mar 11 15:45 part-00000\r\n"
]
}
],
"source": [
"!ls -l my_output"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "v9LmpcaMyG23"
},
"source": [
"The actual output of the MapReduce job is contained in the file `part-00000` in the output directory."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:46:02.385529Z",
"iopub.status.busy": "2024-03-11T15:46:02.385260Z",
"iopub.status.idle": "2024-03-11T15:46:02.521495Z",
"shell.execute_reply": "2024-03-11T15:46:02.520819Z"
},
"id": "eL-Clat5yD8I",
"outputId": "d2340516-50fd-4945-bda1-4510f9a82885"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Hello, World!\t\r\n"
]
}
],
"source": [
"!cat my_output/part-00000"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "AmpHr_HyyMnM"
},
"source": [
"# MapReduce without specifying mapper or reducer\n",
"\n",
"In the previous example, we have seen how to run a MapReduce job without specifying any reducer.\n",
"\n",
"Since the only required options for `mapred streaming` are `input` and `output`, we can also run a MapReduce job without specifying a mapper."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"execution": {
"iopub.execute_input": "2024-03-11T15:46:02.524604Z",
"iopub.status.busy": "2024-03-11T15:46:02.524347Z",
"iopub.status.idle": "2024-03-11T15:46:03.310013Z",
"shell.execute_reply": "2024-03-11T15:46:03.309201Z"
},
"id": "ZPWL1AiXyJac",
"outputId": "3dfc9d86-85b9-497d-bfba-97f890a3424b"
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-03-11 15:46:03,162 ERROR streaming.StreamJob: Unrecognized option: -h\r\n",
"Usage: $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar [options]\r\n",
"Options:\r\n",
" -input