{ "cells": [ { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "# Benchmarking your Hadoop file system with TestDFSIO \n" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Outline\n", "\n", "- Introduction\n", "- HDFS: the Hadoop Distributed File System \n", "- TestDFSIO basic usage\n", "- How to interpret the results\n", "- Clean up\n", "- Useful links\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Introduction\n", "\n", "HDFS (Hadoop Distributed File System) is the basis of the Hadoop Big Data framework. HDFS offers fault tolerance thanks to the replication of data across the underlying cluster. TestDFSIO is a tool included in the Hadoop software distribution that is meant to measure the data read and write performance of your HDFS installation." ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## HDFS: the Hadoop Distributed File System \n", "\n", "Large amounts of data are collected every day that traditional technologies cannot handle. \"Big Data\" has become the catch-all term for massive amounts of data as well as for frameworks and R&D initiatives aimed at working with it efficiently. [Apache Hadoop](https://hadoop.apache.org/) is one of the most widely adopted such frameworks for Big Data processing. \n", "\n", "The two core component of Hadoop are the Hadoop Distributed File System HDFS and the cluster and resource manager YARN (Yet Another Resource Negotiatior).\n", "\n", "The file system HDFS is designed to run on clusters of computers and provides a reliable and efficient solution for working with large (in the GB or even TB scale) datasets. HDFS is written in Java and was originally inspired by the Google File System (GFS or GoogleFS), providing its open-source alternative.\n", "\n", "One of the core features of HDFS is *fault tolerance* or *high availability*: this is the capability of a system to maintain continuous operations even in the event of failure of one of its parts (both hardware or software). Fault tolerance in HDFS is achieved through *data replication*: data is simply stored in multiple copies across different hosts. \n", "\n", "Here is a diagram showing the architecture of a Hadoop cluster. The NameNode is responsible for storing the filesystem's metadata, whereas DataNodes store the actual data. While data storage is fault-tolerant, the name node might constitute a single point-of-failure. Therefore typically one standby NameNode service runs on one of the data nodes, providing a backup in case of failure of the main NameNode. \n", "\n", "\n", "![HDFS Architecture](HDFS_Architecture.png)", "\n", "Hadoop's file system reliability through data replication clearly comes at the cost of performance. Many data processing scenarios (also known as extract, transform, load, or ETL procedures) consist a sequence of MapReduce jobs. The two main performance shortcomings in iterative computations are due to the interplay of\n", "\n", "- intermediate data storage\n", "- replication \n", "\n", "Intermediate data in fact needs to be persisted at each step in multiple copies (the number of copies is the HDFS replication factor, typically ≥ 3).\n", "\n", "## TestDFSio\n", "\n", "TestDFSio is a tool for measuring the performance of read and write operations on HDFS and can be used to measure performance, benchmark, or stress-test a Hadoop cluster.\n", "\n", "TestDFSio uses `MapReduce` to write files to the HDFS filesystem spanning one mapper for file; the reducer is used to collect and summarize test data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### TestDFSIO basic usage\n", "\n", "To run write or read tests you first need to locate the test file `hadoop-mapreduce-client-jobclient*tests.jar`. In our Cloudera installation this is located in:\n", "\n", "`/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient.jar`.\n", "\n", "### Options\n", "- `-write` to run write tests\n", "- `-read` to run read tests\n", "- `-nrFiles` the number of files (set to be equal to the number of mappers)\n", "- `-fileSize` size of files (followed by B|KB|MB|GB|TB)\n", "\n", "TestDFSIO generates exactly 1 map task per file, so it is a 1:1 mapping from files to map tasks.\n", "\n", "### Specify custom i/o directory\n", "\n", " to avoid permission problems (you need to have read/write access to `/benchmarks/TestDFSIO` on HDFS).\n", "\n", "By default TestHDFSio uses the HDFS directory `/benchmarks` to read and write, therefore it is recommended to run the tests as `hdfs`.\n", "\n", "In case you want to run the tests as a user who has no write permissions on HDFS' root folder (`/`), you can specify an alternative directory with the `-D` assigning a new value to `test.build.data`:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running a write test\n", "\n", "We are going to run a test with `nrFiles` files, each of size `fileSize`, using a custom output directory." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export myDir=/user/${USER}/benchmarks\n", "export nrFiles=80\n", "export fileSize=10GB\n", "cd ~\n", "yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-tests.jar TestDFSIO -D test.build.data=$myDir -write -nrFiles $nrFiles -fileSize $fileSize" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Output 25.7.2019 (20 files of size 10GB, 200GB total)\n", "\n", "Note: in order to activate more than one node for the read/write operations, we need to configure the test job with more than 46 jobs (where nr. of jobs=nr. of mappers). This is because each node has 48 virtual CPUs and 46 are available (1 physical CPU or two virtualCPUs are reserved for the node's OS and bookkeeping).\n", "\n", "~~~bash\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Date & time: Thu Jul 25 15:07:02 CEST 2019\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Number of files: 20\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Total MBytes processed: 204800.0\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Throughput mb/sec: 38.77458674204487\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Average IO rate mb/sec: 39.016483306884766\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: IO rate std deviation: 3.2993795422143273\n", "19/07/25 15:07:02 INFO fs.TestDFSIO: Test exec time sec: 301.748\n", "~~~" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Running a read test\n", "\n", "We are going to run a test with `nrFiles` files, each of size `fileSize`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export myDir=/user/${USER}/benchmarks\n", "export nrFiles=10\n", "export fileSize=10GB\n", "cd ~\n", "yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-tests.jar TestDFSIO -D test.build.data=$myDir -read -nrFiles $nrFiles -fileSize $fileSize" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Output 25.7.2019 (20 files of size 10GB, 200GB total)\n", "\n", "~~~bash\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Date & time: Thu Jul 25 15:13:39 CEST 2019\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Number of files: 20\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Total MBytes processed: 204800.0\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Throughput mb/sec: 72.07788071321907\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Average IO rate mb/sec: 97.98143005371094\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: IO rate std deviation: 83.42162014957364\n", "19/07/25 15:13:39 INFO fs.TestDFSIO: Test exec time sec: 185.435\n", "~~~" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a write test with replication=1\n", "\n", "In order to measure the effect of replication on the overall throughput, we are going to run a write test with replication factor 1 (no replication)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export myDir=/user/${USER}/benchmarks\n", "export nrFiles=10\n", "export fileSize=10GB\n", "cd ~\n", "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-tests.jar TestDFSIO -D test.build.data=$myDir -D dfs.replication=1 -write -nrFiles 20 -fileSize 10GB" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Output 1 (20 files of size 10GB, 200GB total)\n", "\n", "~~~bash\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Date & time: Thu Jul 25 15:44:26 CEST 2019\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Number of files: 20\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Total MBytes processed: 204800.0\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Throughput mb/sec: 39.38465325447428\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Average IO rate mb/sec: 39.59946060180664\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: IO rate std deviation: 3.0182194679812717\n", "19/07/25 15:44:26 INFO fs.TestDFSIO: Test exec time sec: 292.66\n", "~~~\n", "\n", "#### Output 2 (20 files of size 10GB, 200GB total)\n", "~~~bash\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Date & time: Thu Sep 12 14:03:49 CEST 2019\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Number of files: 20\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Total MBytes processed: 204800\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Throughput mb/sec: 41.18\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Average IO rate mb/sec: 41.22\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: IO rate std deviation: 1.33\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: Test exec time sec: 275.41\n", "19/09/12 14:03:49 INFO fs.TestDFSIO: \n", "~~~\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Advanced test configuration\n", " \n", "### `read` options\n", "In addition to the default sequential file access, the mapper class for reading data can be configured to perform various types of random reads:\n", " * _random read_ always chooses a random position to read from (skipSize = 0)\n", " * _backward read_ reads file in reverse order (skipSize < 0)\n", " * _skip-read skips_ skipSize bytes after every read (skipSize > 0)\n", " \n", "### Using compression\n", "\n", "The `-compression` option allows to specify a _codec_ for the input and output of data.\n", "\n", "#### What is a codec\n", "\n", "Codec is a portmanteau of coder and decoder and it designates any hardware or software device that is used to encode—most commonly also reducing the original size—and decode information. Hadoop provides classes that encapsulate compression and decompression algorithms, such as `GzipCodec` for the gzip algorithm. \n", "\n", "These are all currently available Hadoop compression codecs:\n", "\n", "| Compression format | Hadoop CompressionCodec | \n", "| --- | --- | \n", "|DEFLATE |\torg.apache.hadoop.io.compress.DefaultCodec|\n", "|gzip |\torg.apache.hadoop.io.compress.GzipCodec|\n", "|bzip2 |\torg.apache.hadoop.io.compress.BZip2Codec|\n", "|LZO |\tcom.hadoop.compression.lzo.LzopCodec|\n", "|LZ4 |\torg.apache.hadoop.io.compress.Lz4Codec|\n", "|Snappy |\torg.apache.hadoop.io.compress.SnappyCodec|\n", "\n", "### More options\n", "\n", "For more options see usage:\n", "\n", "`$ hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-tests.jar TestDFSIO -help\n", "Usage: TestDFSIO [genericOptions] -read [-random | -backward | -skip [-skipSize Size]] | -write | -append | -clean [-compression codecClassName] [-nrFiles N] [-size Size[B|KB|MB|GB|TB]] [-resFile resultFileName] [-bufferSize Bytes]`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## How to interpret the results\n", "\n", "The main measurements produced by the HDFSio test are:\n", "- _throughput_ in mb/sec\n", "- average _IO rate_ in mb/sec\n", "- standard deviation of the IO rate\n", "- test execution time\n", "\n", "All test results are logged by default to the file `TestDFSIO_results.log`. The log file can be changed with the option `-resFile resultFileName`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!tail -8 cd ~/TestDFSIO_results.log" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### What is throughput\n", "Throughput or data transfer rate measures the amount of data read or written (expressed in Megabytes per second -- MB/s) to the filesystem.\n", "\n", "Throughput is one of the main performance measures used by disk manufacturers as knowing how fast data can be moved around in a disk is an important important factor to look at. \n", "\n", "#### What is IO rate\n", "\n", "IO rate also abbreviated as IOPS measures IO operations per second, which means the amount of read or write operations that could be done in one seconds time. A certain amount of IO operations will also give a certain throughput of Megabytes each second, so these two are related. The linking factor is the size of each IO request. This may vary depending on the operating system and the application or service that needs disk access. \n", "\n", "Throughput and IO rate in a formula:\n", "\n", " Average IO size x IOPS = Throughput in MB/s\n", " \n", "The block size in our HDFS installation is the standard value of 128MiB \n", "\n", "#### Concurrent versus overall throughput\n", "\n", "The listed throughput shows the average throughput among all the map tasks. \n", "To get an approximate overall throughput on the cluster you can divide the total MBytes by the test execution time in seconds." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "\n", "To remove test files and free up space after completing the test use the option `clean`. \n", "\n", "Note that files will be overwritten, so if you perform a series of tests using the same directory it is only necessary to clean up at the end." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export myDir=/user/${USER}/benchmarks\n", "hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-client-jobclient-tests.jar TestDFSIO -D test.build.data=$myDir -clean" ] }, { "cell_type": "markdown", "metadata": { "slideshow": { "slide_type": "slide" } }, "source": [ "## Further reading and some useful links\n", "\n", "TestDFSIO does not have an official documentation but the source file [TestDFSIO.java](https://github.com/c9n/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java) is available in the Apache source code repository.\n", "\n", "- [Distributed I/O Benchmark of HDFS](https://bdaafall2015.readthedocs.io/en/latest/dfsio.html)\n", "- [Benchmarking and Stress Testing an Hadoop Cluster with TeraSort, TestDFSIO & Co.](http://www.michael-noll.com/blog/2011/04/09/benchmarking-and-stress-testing-an-hadoop-cluster-with-terasort-testdfsio-nnbench-mrbench/)\n", "- [Improve TestDFSIO](https://issues.apache.org/jira/browse/HDFS-1338)\n", "- [Running Hadoop Benchmarking TestDFSIO on Cloudera Clusters](https://anoopkm.wordpress.com/2015/10/19/running-hadoop-benchmarking-testdfsio-on-cloudera-clusters/)\n", "- [Hadoop Performance Evaluation by Benchmarking and Stress Testing with TeraSort and TestDFSIO](https://medium.com/ymedialabs-innovation/hadoop-performance-evaluation-by-benchmarking-and-stress-testing-with-terasort-and-testdfsio-444b22c77db2)\n", "- [TestDFSIO source code on ](https://github.com/BBVA/spark-benchmarks/blob/master/docs/TestDFSIO.md)\n" ] } ], "metadata": { "css": [ "" ], "kernelspec": { "display_name": "PySpark3 (Spark 2.3)", "language": "python", "name": "pyspark3kernel" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 2 }