{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Hadoop spilling\n", "\n", "_Spill_ in the context of Mapreduce means writing data from memory to disk. Spilling occurs whenever there is not enough memory to fit all the mapper output during a sorting operation. The amount of memory available is set in **mapreduce.task.io.sort.mb**.\n", "\n", "\n", "\n", "From https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html:\n", "\n", "_A record emitted from a map will be serialized into a buffer and metadata will be stored into accounting buffers. When either the serialization buffer or the metadata exceed a threshold, the contents of the buffers will be sorted and written to disk in the background while the map continues to output records. If either buffer fills completely while the spill is in progress, the map thread will block. When the map is finished, any remaining records are written to disk and all on-disk segments are merged into a single file. Minimizing the number of spills to disk can decrease map time, but a larger buffer also decreases the memory available to the mapper._\n", "\n", "The value of **mapreduce.task.io.sort.mb** should clearly be smaller than that **mapreduce_map_memory_mb** since the mapper needs memory not only for sorting.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Default values in the Yarn configuration\n", "\n", "\n", " \n", "
PROPERTY | \n", "VALUE | \n", "DESCRIPTION | \n", "
mapreduce.task.io.sort.factor | \n", "10 | \n", "The number of streams to merge at the same time while sorting files. That is, the number of sort heads to use during the merge sort on the reducer side. This determines the number of open file handles. Merging more files in parallel reduces merge sort iterations and improves run time by eliminating disk I/O. Note that merging more files in parallel uses more memory. If 'io.sort.factor' is set too high or the maximum JVM heap is set too low, excessive garbage collection will occur. The Hadoop default is 10, but Cloudera recommends a higher value. Will be part of generated client configuration. | \n", "
mapreduce.task.io.sort.mb | \n", "100 | \n", "The total amount of memory buffer, in megabytes, to use while sorting files. Note that this memory comes out of the user JVM heap size (meaning total user JVM heap - this amount of memory = total user usable heap space. Note that Cloudera's default differs from Hadoop's default; Cloudera uses a bigger buffer by default because modern machines often have more RAM. The smallest value across all TaskTrackers will be part of generated client configuration. | \n", "
mapreduce.map.sort.spill.percent | \n", "0.80 | \n", "The soft limit in either the buffer or record collection buffers. When this limit is reached, a thread will begin to spill the contents to disk in the background. Note that this does not imply any chunking of data to the spill. A value less than 0.5 is not recommended. The syntax is in decimal units; the default is 80% and is formatted 0.8. Will be part of generated client configuration. | \n", "
mapreduce_map_memory_mb | \n", "512 | \n", "The amount of memory to request from the scheduler for each map task. | \n", "
2019-04-25 09:55:22,949 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 67\n", "2019-04-25 09:55:22,950 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 239326784 kv 59831692(239326768) kvi 57688416(230753664)\n", "2019-04-25 09:55:26,118 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output\n", "2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 239326784; bufend = 151336447; bufvoid = 268435456\n", "2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 59831692(239326768); kvend = 51255868(205023472); length = 8575825/16777216\n", "2019-04-25 09:55:26,130 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 159926367 kvi 39981584(159926336)\n", "2019-04-25 09:55:34,315 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 68\n", "2019-04-25 09:55:34,315 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 159926367 kv 39981584(159926336) kvi 37838320(151353280)\n", "2019-04-25 09:55:37,877 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output\n", "2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 159926367; bufend = 71936148; bufvoid = 268435456\n", "2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 39981584(159926336); kvend = 31405800(125623200); length = 8575785/16777216\n", "2019-04-25 09:55:37,878 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 80526068 kvi 20131512(80526048)\n", "2019-04-25 09:55:46,365 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 69\n", "\n", "----\n", "
\n", "2020-04-17 09:40:14,513 INFO [main] org.apache.hadoop.mapred.MapTask: Record too large for in-memory buffer: 268435452 bytes\n", "2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output\n", "2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 180996067; bufvoid = 268435456\n", "2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 67108860(268435440); kvend = 67108860(268435440); length = 1/16777216\n", "2020-04-17 09:41:24,960 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 180996067 kvi 45249012(180996048)\n", "2020-04-17 09:41:25,474 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 1036\n", "2020-04-17 09:41:25,474 INFO [main] org.apache.hadoop.mapred.MapTask: (RESET) equator 180996067 kv 45249012(180996048) kvi 45249012(180996048)\n", "2020-04-17 09:41:25,690 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 67108860(268435440)\n", "2020-04-17 09:41:25,690 INFO [main] org.apache.hadoop.mapred.MapTask: Record too large for in-memory buffer: 268435452 bytes\n", "2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output\n", "2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 180996067; bufvoid = 268435456\n", "2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 67108860(268435440); kvend = 67108860(268435440); length = 1/16777216\n", "2020-04-17 09:42:45,023 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 180996067 kvi 45249012(180996048)\n", "2020-04-17 09:42:45,419 INFO [SpillThread] org.apache.hadoop.mapred.MapTask: Finished spill 1038\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### How to prevent data spill\n", "\n", "Spilling reduces the performance of mapreduce jobs because of the overhead of writing data to disk that blocks the map thread, hence it should be as much as possible be avoided. Ideally, one should strive for at most one spill per mapper.\n", "\n", "To prevent data spill one needs to make sure that
io.sort.mb
is less than the size of the data emitted from the mapper. This can be achieved by:\n",
" - increasing mapreduce.task.io.sort.mb
(but making sure that enough memory remains available for the mapper task)\n",
" - adjust the mapper so that the records it emits fit into io.sort.mb
. This can be achieved for instance by splitting the data more so as to increase the number of mappers\n",
" \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"But maximizing parallelization might not be always the best choice because of the overhead required to start each single map task and also because this way of splitting the data doesn't take into account the HDFS block size. Since the hadoop file system saves files into blocks and distributes them across the datanodes in the cluster, it is always convenient to split the data into chunks that are a multiple of the cluster's HDFS block size.\n",
"\n",
"The default Hadoop block size is defined as\n",
"\n",
"dfs.block.size = 128MiB
\n",
"\n",
"so for instance a good choice for a file of size $1$GB (note: $1$GB = $953,674$MiB) would be \n",
"$953,674/12800 = 74$.\n",
"\n",
"See also: Hadoop Wiki: Partitioning your job into maps and reduces https://wiki.apache.org/hadoop/HowManyMapsAndReduces\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Further links\n",
"\n",
"\n",
"- Optimizing MapReduce Job performance https://www.slideshare.net/Hadoop_Summit/optimizing-mapreduce-job-performance\n",
"- IBM Hadoop/Yarn tuning https://www.ibm.com/support/knowledgecenter/STXKQY_BDA_SHR/bl1bda_commontuning.htm\n",
"- MapReduce Tutorial by Apache Hadoop http://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}