{ "metadata": { "name": "", "signature": "sha256:7451971cb76e1fcaf7ba5b862c795f16406edc635f95b2718023a635cc66b167" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# MPI\u2014Message Passing Interface\n", "[MPI](http://www.mcs.anl.gov/research/projects/mpi/) is a library specification describing a parallel programming paradigm suitable for use on distributed-memory machines, such as modern supercomputers and distributed clusters.\n", "\n", "In this lesson, we will review parallel programming concepts and introduce several useful MPI functions and concepts. We will work within this [IPython](ipython.org) notebook, which allows us to lay out our code, results, and commentary in the same interface. Feel free to open up a `bash` shell in the background if you prefer to look at and execute your code that way.\n", "\n", "You may need to load Canopy on the UIUC EWS workstations first:\n", "\n", " module load canopy\n", " cd\n", " ipython notebook" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Background: Parallel Programming Paradigms\n", "\n", "To better contextualize this, let us briefly review the basic parallel computing models in use today.\n", "\n", "### Computer Architecture\n", "\n", "A classical computer processor receives instructions as sets of binary data from memory and uses these instructions (in assembly language) to manipulate data from memory in predictable ways. A processor works on a single piece of data at a time (although other data units may be waiting in the registers or cache) and executes a single thread, or sequential list, of commands on the data.\n", "\n", "Thus we may say that a conventional desktop computer is SISD\u2014*S*ingle *I*nstruction, *S*ingle *D*ata: a single program operates on a single data element sequentially, then exchanges it back into memory and retrieves a new datum. Any apparent multitasking is due to the operating system periodically switching the CPU's context from one thread to another. We will represent this scenario in the following graphic, which shows a single processor interacting via a _bus_ (the wavy black band) with a collection of memory chips (which can be RAM, ROM, hard drives, etc.).\n", "\n", "![](./img/SISD-base.png)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### SIMD\u2014*S*ingle *I*nstruction, *M*ultiple *D*ata\n", "\n", "Springboarding off of SISD, perhaps the simplest way to think about a way of parallel programming is the SIMD model. In this model, multiple processing units operate simultaneously on multiple pieces of data _in the same way_.\n", "\n", "Consider, for instance, the addition of two eight-element vectors. If we have eight processors available, then each processor can add two corresponding elements from the two vectors directly; the program then yields an efficiently added single eight-element vector afterwards (if we assume little to no overhead costs for the _vectorization_ of the program).\n", "\n", "![](./img/SIMD-vector-add.png)\n", "\n", "(This, incidentally, was the major advantage of the first Cray supercomputers in the late 1970s: vectorization let them operate on many data elements simultaneously, thus achieving stupendous speedups.)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### MIMD\u2014*M*ultiple *I*nstruction, *M*ultiple *D*ata\n", "\n", "Now it's not a major leap to think about using each of those processors to do something _different_ to their assigned data element. It may not make much sense with vector addition, but when performing more complex operations like finite element differential equation solution or Monte Carlo random number integration, we often want a lot of plates spinning.\n", "\n", "For MIMD to be effective, we need two different kinds of parallelism: data-level parallelism (can I segment my problem domain into complementary parts, whether in real space, time, or phase space?); and control-level parallelism (do I need different parts of my program to execute differently based on the data they have allotted to them?).\n", "\n", "Imagine trying to calculate $\\pi$ by throwing darts at a circle. We can count the number of darts that hit within the radius of the circle, $n_{\\text{circle}}$, and compare that value with the ratio of all darts within a bounding square, $n_{\\text{square}}$. (Physical simulation of this process is left as an exercise to the reader.) As we know the equations defining area, we can obtain $\\pi$ trivially:\n", "$$\\begin{array}{l} A_\\text{circle} = \\pi r^2 \\\\ A_\\text{square} = 4 \\pi r^2 \\end{array} \\implies r^2 = \\frac{A_\\text{square}}{4} = \\frac{A_\\text{circle}}{\\pi} \\implies \\pi \\approx 4 \\frac{n_\\text{circle}}{n_\\text{square} + n_\\text{circle}} \\text{.}$$\n", "\n", "![](./img/darts.png)\n", "\n", "It is apparent how this algorithm can benefit from parallelization: since any one dart is thrown independently of the others, we are not restricted from throwing a large number simultaneously. One possible algorithm could look like this:\n", "\n", " initialize_memory\n", " parallel {\n", " throw_dart\n", " total_darts = total_darts + 1\n", " count_my_darts_in_circle\n", " }\n", " add_all_darts_in_circle\n", " add_all_darts\n", " pi = 4 * darts_in_circle / total_darts\n", "\n", "As you can see, in the parallel portion it doesn't matter to one processor what any other processor is doing. The only time we need all of the processors in sync is at the end of the parallel section where we add up all the darts thrown and that hit the circle. (We will return to this point later.)\n", "\n", "Let's take a look at the architecture of MIMD machines in a bit more detail now. The major division in MIMD programming is shared-memory _v._ distributed-memory systems.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Shared-Memory Systems**\n", "\n", "If the system architecture allows any processor to directly access any location in global memory space, then we refer to the machine as having a _shared memory_. This is convenient in practice for the programmer but can lead to inefficiencies in hardware and software library design. For instance, scaling beyond 32 or 64 processors has been a persistent problem for this architecture choice (contrast tens of thousands of processors for well-designed distributed-memory systems).\n", "\n", "Examples of shared-memory parallelization specifications and libraries include [OpenMP](http://www.openmp.org/) and [OpenACC](http://www.openacc.org/).\n", "\n", "![](./img/MIMD-SM-base.png)\n", "\n", "Addressing the dart-throwing problem above in the shared-memory paradigm is trivial, since each process can contribute its value from its unique memory location to the final result (a process known as _reduction_)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Distributed-Memory Systems**\n", "\n", "For distributed-memory MIMD architectures, every processor is alloted its own physical memory and has no knowledge of other processors' memory. This is the classic problem which MPI was designed to address: we now have to co\u00f6rdinate and communicate data across an internal network to allow these processors to work effectively together.\n", "\n", "![](./img/MIMD-DM-base.png)\n", "\n", "The dart-throwing example from before is now nontrivial, but completely scalable (subject to communication overhead). Every processor is independent and can calculate for one or many darts thrown. However, some sort of reduction must be performed to obtain a final coherent result for the number of darts within the circle (the total number should be known _a priori_ from the number of processes, presumably). The pseudocode in this case looks more like this:\n", "\n", " initialize_memory\n", " parallel {\n", " throw_dart\n", " total_darts = total_darts + 1\n", " count_my_darts_in_circle\n", " }\n", " darts_in_circle = reduction_over_all_processors_of_darts_in_their_circles\n", " add_all_darts\n", " pi = 4 * darts_in_circle / total_darts\n", "\n", "We will shortly examine this case as implemented in MPI." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## MPI Basics" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hello World\n", "\n", "The classical example for any new programming language or library is to construct a nontrivial \"Hello World!\" program. In the following code (`./src/mpi-mwe/c/hello_world_mpi.c`), we will see the basic elements of any MPI program, including preliminary setup, branching into several processes, and cleanup when program execution is about to cease. (No message passing occurs in this example.)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file hello_world_mpi.c\n", "#include \n", "#include \n", "\n", "int main(int argc, char *argv[]) {\n", " int rank_id, ierr, num_ranks;\n", " double start_time, wall_time;\n", " \n", " printf(\"C MPI minimal working example\\n\");\n", " \n", " ierr = MPI_Init(&argc, &argv);\n", " start_time = MPI_Wtime();\n", " ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);\n", " ierr = MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);\n", " \n", " if (rank_id == 0) {\n", " printf(\"Number of available processors = %d.\\n\", num_ranks);\n", " }\n", " printf(\"\\tProcess number %d branching off.\\n\", rank_id);\n", " \n", " wall_time = MPI_Wtime() - start_time;\n", " \n", " if (rank_id == 0) {\n", " printf(\"Elapsed wallclock time = %8.6fs.\\n\", wall_time);\n", " }\n", " \n", " MPI_Finalize();\n", " return 0;\n", "}" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let\u2019s compile and execute this program before commenting on its contents. In order to link the program properly, there are a number of MPI libraries and include files which need to be specified. Fortunately, a convenient wrapper around your preferred C/C++/Fortran compiler has been provided: in this case, `mpicc`. To see what the wrapper is doing behind the scenes, use the `-show` option:\n", "\n", " $ mpicc -show\n", " clang -I/usr/local/Cellar/open-mpi/1.8.1/include\n", " -L/usr/local/opt/libevent/lib\n", " -L/usr/local/Cellar/open-mpi/1.8.1/lib -lmpi\n", "\n", "We can thus compile and execute the above example by simply entering,\n", "\n", " $ mpicc -o hello_world_mpi hello_world_mpi.c\n", "$ ./hello_world_mpi\n", " C MPI minimal working example\n", " Number of available processors = 1.\n", " Process number 0 branching off.\n", " Elapsed wallclock time = 0.000270s.\n" ] }, { "cell_type": "code", "collapsed": false, "input": [ "!mpicc -o hello_world_mpi hello_world_mpi.c\n", "!./hello_world_mpi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, that's not right, is it? I have a number of processors on my modern machine, so why didn't this use them? In the case of MPI, it is necessary to use the script `mpiexec` which sets up the copies in parallel and co\u00f6rdinates message passing.\n", "\n", " $ mpiexec ./hello_world_mpi\n", " C MPI minimal working example\n", " C MPI minimal working example\n", " C MPI minimal working example\n", " C MPI minimal working example\n", " Process number 1 branching off.\n", " Number of available processors = 4.\n", " Process number 0 branching off.\n", " Elapsed wallclock time = 0.000021s.\n", " Process number 2 branching off.\n", " Process number 3 branching off." ] }, { "cell_type": "code", "collapsed": false, "input": [ "!mpiexec ./hello_world_mpi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "(Note the _race conditions_ apparent here: with input and output, unless you explicitly control access and make all of the threads take turns, the output order is unpredictable.)\n", "\n", "Okay, let's step back and analyze the program now, noting the following points:\n", "- MPI functions pass values by reference.\n", "- MPI functions do not return the value of the variable they modify, but rather an error code. The modified value is in the referenced variable.\n", "- A universal communicator, `MPI_COMM_WORLD`, is defined to permit communication with all processes. (Subsets of this may be defined as desired.)\n", "- `MPI_Init` and `MPI_Finalize` must be called to initialize and terminate the message-passing environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise\n", "- Run the \"Hello World\" script with 1, 2, 4, 8, and 16 processes. See if the time required varies significantly. Try this with other numbers as well: 3, 7, etc." ] }, { "cell_type": "code", "collapsed": false, "input": [ "!mpiexec -np 1 ./hello_world_mpi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Message Passing: `MPI_Send`, `MPI_Recv`\n", "\n", "At the highest level, an MPI implementation supports the parallel execution of a number of copies of a program which can intercommunicate with each other across an equal number of processors. Internode and intranode communications are implicitly handled by the operating system, interconnect, and MPI libraries.\n", "\n", "The fundamental pair of actions for a processor using MPI is _sending_ and _receiving_ messages. (This is also known as _point-to-point communication_.) A number of more complex actions, such as the partitioning of data or collection of calculated results, are built from these point-to-point communications. Finally, more advanced features exist to customize the processor topology, operations, and process management.\n", "\n", "![](./img/MPI-Send-Recv.png)\n", "\n", "Send and receive actions are always paired with each other. There are a number of flavors, but let's start with the basic two: `MPI_Send` and `MPI_Recv`. The function prototypes are:\n", "\n", " int MPI_Send(\n", " void* message, // Pointer to data to send\n", " int count, // Number of data values to send\n", " MPI_Datatype datatype, // Type of data (e.g. MPI_INT)\n", " int destination_rank, // Rank of process to receive message\n", " int tag, // Identifies message type\n", " MPI_Comm comm // Use MPI_COMM_WORLD\n", " )\n", " \n", " int MPI_Recv(\n", " void* message, // Points to location in memory where\n", " // received message is to be stored.\n", " int count, // MAX number of data values to accept \n", " MPI_Datatype datatype // Type of data (e.g. MPI_INT) \n", " int source_rank, // Rank of process to receive from\n", " // (Use MPI_ANY_SOURCE to accept\n", " // from any sender)\n", " int tag, // Type of message to receive\n", " // (Use MPI_ANY_TAG to accept any type)\n", " MPI_Comm comm, // Use MPI_COMM_WORLD\n", " MPI_Status* status // To receive info about the message\n", " )\n", "\n", "There's a lot of information there, and none of it optional (if you want to hide it, use a language and package such as Python and [MPI4Py](mpi4py.scipy.org)). Let's unpack the arguments a little more with a trivial example that implements the previous dart-throwing example." ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file darts_pi.c\n", "#include \n", "#include \n", "#include \n", "#include \n", "#include \n", "\n", "#define N 100000\n", "#define R 1\n", "\n", "double uniform_rng() { return (double)rand() / (double)RAND_MAX; } // Not great but not the point of this exercise.\n", "\n", "int main(int argc, char *argv[]) {\n", " int rank_id, ierr, num_ranks;\n", " MPI_Status status;\n", " \n", " ierr = MPI_Init(&argc, &argv);\n", " ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);\n", " ierr = MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);\n", " srand(time(NULL) + rank_id);\n", " \n", " // Calculate some number of darts thrown into the circle.\n", " int n_circle = 0;\n", " double x, y;\n", " for (int t = 0; t < N; t++) {\n", " x = uniform_rng();\n", " y = uniform_rng();\n", " if (x*x + y*y < (double)R*R) n_circle++;\n", " }\n", " \n", " // If this is the first process, then gather everyone's data. Otherwise, send it to the first process.\n", " int total_circle[num_ranks]; // C99 Variable Length Arrays; compile with `-std=c99` or else use `malloc`.\n", " if (rank_id == 0) {\n", " total_circle[0] = n_circle;\n", " for (int i = 1; i < num_ranks; i++) {\n", " ierr = MPI_Recv(&total_circle[i], 1, MPI_INT, i, MPI_ANY_TAG, MPI_COMM_WORLD, &status);\n", " //printf (\"\\t%d: recv %d from %d\\n\", rank_id, total_circle[i], i);\n", " }\n", " } else {\n", " ierr = MPI_Send(&n_circle, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);\n", " //printf (\"\\t%d: send %d to %d\\n\", rank_id, n_circle, 0);\n", " }\n", " \n", " // Now sum over the data and calculate the approximation of pi.\n", " int total = 0;\n", " if (rank_id == 0) {\n", " for (int i = 0; i < num_ranks; i++) {\n", " total += total_circle[i];\n", " }\n", " printf(\"With %d trials, the resulting approximation to pi = %f.\\n\", num_ranks*N, 4.0*(double)total/((double)N*(double)num_ranks));\n", " }\n", " \n", " MPI_Finalize();\n", " return 0;\n", "}" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Compile and execute this code in the shell:\n", "\n", " $ mpicc -std=c99 -o darts_pi darts_pi.c\n", "$ mpiexec -n 4 ./darts_pi \n", " With 400000 trials, the resulting approximation to pi = 3.137710.\n", " Elapsed wallclock time = 0.002829s.\n", "\n", "A few remarks:\n", "- **We know this algorithm is inefficient**: 400,000 trials for 2 digits of accuracy is absurd! Compare a series solution, which should give you that in a handful of terms.\n", "- I don't do a great job of **calculating the random values as floating-point values**, just dividing by the maximum possible integer value from `stdlib.h`. (This example is doubly dangerous in that it assumes that the default C random number library is thread safe! **It is not!**\u2014more on that in the OpenMP lesson.)\n", "- Note how we are **accessing the respective elements of the array** `total_circle` in the `MPI_Recv` clause.\n", "- We **limit the efficiency** by receiving the messages in order." ] }, { "cell_type": "code", "collapsed": false, "input": [ "# To run this code from within the notebook.\n", "!mpicc -std=c99 -o darts_pi darts_pi.c\n", "!mpiexec -n 4 ./darts_pi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Collective Operations: `MPI_Reduce`\n", "\n", "We can render this function much more readable and less error-prone by using the convenience function `MPI_Reduce`, which achieves the same effective outcome as the prior code without the explicit message management. Reduction takes a series of values from each processor, applies some operation to them, and then places the result on the root process (which doesn't have to be `0`, although it often is).\n", "\n", "![](./img/MPI-Reduce.png)\n", "\n", " int MPI_Reduce(\n", " void* value, // Input value from this process\n", " void* answer, // Result -- on root process only\n", " int count, // Number of values -- usually 1\n", " MPI_Datatype datatype, // Type of data (e.g. MPI_INT) \n", " MPI_Op operation, // What to do (e.g. MPI_SUM) \n", " int root, // Process that receives the answer \n", " MPI_Comm comm // Use MPI_COMM_WORLD \n", " )\n", "\n", "In the context of our prior code, we can convert the code segment as follows:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file darts_pi.c\n", "#include \n", "#include \n", "#include \n", "#include \n", "#include \n", "\n", "#define N 100000\n", "#define R 1\n", "\n", "double uniform_rng() { return (double)rand() / (double)RAND_MAX; } // Not good but not the point of this exercise.\n", "\n", "int main(int argc, char *argv[]) {\n", " int rank_id, ierr, num_ranks;\n", " MPI_Status status;\n", " \n", " ierr = MPI_Init(&argc, &argv);\n", " ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);\n", " ierr = MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);\n", " srand(time(NULL) + rank_id);\n", " \n", " // Calculate some number of darts thrown into the circle.\n", " int n_circle = 0;\n", " double x, y;\n", " for (int t = 0; t < N; t++) {\n", " x = uniform_rng();\n", " y = uniform_rng();\n", " if (x*x + y*y < (double)R*R) n_circle++;\n", " }\n", " \n", " // Reduce by summation over all data and output the result.\n", " int total;\n", " ierr = MPI_Reduce(&n_circle, &total, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);\n", " if (rank_id == 0) {\n", " printf(\"With %d trials, the resulting approximation to pi = %f.\\n\", num_ranks*N, 4.0*(double)total/((double)N*(double)num_ranks));\n", " }\n", " \n", " MPI_Finalize();\n", " return 0;\n", "}" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# To run this code from within the notebook.\n", "!mpicc -std=c99 -o darts_pi darts_pi.c\n", "!mpiexec -n 4 ./darts_pi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That reduces ten significant lines of code, two `for` loops, $N_\\text{processor}$ MPI function calls, and an array allocation to a single MPI function call. Not bad. This is typical, I find, of numerical codes: well-written MPI code in a few key locations covers the bulk of your communication needs, and you rarely need to explicitly pass messages or manage processes dynamically." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A Finite Difference Example\n", "\n", "Let's look through an integrated example which will introduce a few new functions and give you a feel for how MPI operates in a larger code base. We will utilize the finite difference method to solve Poisson's equation for an electrostatic potential:\n", "$${\\nabla}^2 \\varphi = -\\frac{\\rho_f}{\\varepsilon}$$\n", "where $\\rho_f$ are the (known) positions of charges; $\\varepsilon$ is the permittivity of the material; and $\\varphi$ is the resultant scalar electric potential field.\n", "\n", "We use a second-order central difference scheme in the discretization of the differential equation in space and take an initial guess which will require iteration to obtain a steady-state solution. The resulting equation for an arbitrary location in the $(x, y)$ grid (assuming uniform discretization) subject to a set of discrete point charges $\\sum \\rho_f$ is:\n", "$$\\frac{\\varphi_{i+1,j}-2\\varphi_{i,j}+\\varphi_{i-1,j}}{\\delta^2} + \\frac{\\varphi_{i,j+1}-2\\varphi_{i,j}+\\varphi_{i,j-1}}{\\delta^2} = \\frac{\\sum \\rho_f}{\\varepsilon} \\implies$$\n", "$$\\varphi_{i,j} = \\frac{1}{4} \\left(\\varphi_{i+1,j}+\\varphi_{i-1,j}+\\varphi_{i,j+1}+\\varphi_{i,j-1}\\right) - \\frac{\\Delta^2}{4} \\left(\\frac{\\sum \\rho_f}{\\varepsilon}\\right) \\text{.}$$\n", "(As the discretization does not subject grid locations to nonlocal influences, it is necessary for the grid locations next to the point charges to propagate that information outward as the iteration proceeds towards a solution.)\n", "\n", "For some known distribution of charges, we can construct a matrix equation and solve it appropriately either by hand-coding an algorithm or by using a library such as [GNU Scientific Library](https://www.gnu.org/software/gsl/). In this case, to keep things fairly explicit, we won't write a matrix explicitly and will instead solve each equation in a `for` loop.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Serial Code**\n", "\n", "One of the odd things we need to consider is that because we are using point charges our grid has to overlap with them directly. (This isn't as much of a problem with distributions.) So we'll just start with a handful of point charges for now, with the boundaries from $(-1,-1)$ to $(1,1)$ set to zero." ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file fd_charge.c\n", "// This is a serial version of the code for reference.\n", "#include \n", "#include \n", "#include \n", "#include //for memcpy()\n", "#include \n", "\n", "#define BOUNDS 1.0\n", "\n", "double uniform_rng() { return (double)rand() / (double)RAND_MAX; } // Not good but not the point of this exercise.\n", "struct point_t { int x, y; double mag; };\n", "\n", "int main(int argc, char *argv[]) {\n", " int N = 10;\n", " if (argc > 1) N = atoi(argv[1]);\n", " int size = N % 2 == 0 ? N : N+1; // ensure odd N\n", " int iter = 10;\n", " if (argc > 2) iter = atoi(argv[2]);\n", " int out_step = 1000;\n", " if (argc > 3) out_step = atoi(argv[3]);\n", " double eps = 8.8541878176e-12; // permittivity of the material\n", " srand(time(NULL));\n", " \n", " // Initialize the grid with an initial guess of zeros as well as the coordinates.\n", " double phi[size][size]; // C99 Variable Length Arrays; compile with `-std=c99` or else use `malloc`.\n", " double oldphi[size][size];\n", " double x[size],\n", " y[size];\n", " double dx = (2*BOUNDS)/size,\n", " dy = dx;\n", " for (int i = 0; i < size; i++) {\n", " x[i] = (double)i*dx - BOUNDS;\n", " for (int j = 0; j < size; j++) {\n", " if (i == 0) y[j] = (double)j*dy - BOUNDS;\n", " phi[i][j] = 0.0;\n", " }\n", " }\n", " // Set up a few random points and magnitudes for the electrostatics problem.\n", " int K = 20;\n", " struct point_t pt_srcs[K];\n", " for (int k = 0; k < K; k++) {\n", " pt_srcs[k].x = (int)(uniform_rng() * N);\n", " pt_srcs[k].y = (int)(uniform_rng() * N);\n", " pt_srcs[k].mag = uniform_rng() * 2.0 - 1.0;\n", " printf(\"(%f, %f) @ %f\\n\", x[pt_srcs[k].x], y[pt_srcs[k].y], pt_srcs[k].mag);\n", " }\n", " \n", " // Iterate forward.\n", " int n_steps = 0; // total number of steps iterated\n", " double inveps = 1.0 / eps; // saves a division every iteration over the square loop\n", " double pt_src = 0.0; // accumulator for whether a point source is located at a specific (i,j) index site\n", " while (n_steps < iter) {\n", " memcpy(oldphi, phi, size*size);\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " // Calculate point source contributions.\n", " pt_src = 0;\n", " for (int k = 0; k < K; k++) {\n", " pt_src = pt_src + ((pt_srcs[k].x == i && pt_srcs[k].y == j) ? pt_srcs[k].mag : 0.0);\n", " }\n", " phi[i][j] = 0.25*dx*dx * pt_src * inveps\n", " + 0.25*(i == 0 ? 0.0 : phi[i-1][j])\n", " + 0.25*(i == size-1 ? 0.0 : phi[i+1][j])\n", " + 0.25*(j == 0 ? 0.0 : phi[i][j-1])\n", " + 0.25*(j == size-1 ? 0.0 : phi[i][j+1]);\n", " }\n", " }\n", " if (n_steps % out_step == 0) {\n", " printf(\"Iteration #%d:\\n\", n_steps);\n", " printf(\"\\tphi(%f, %f) = %24.20f\\n\", x[(int)(0.5*N-1)], y[(int)(0.25*N+1)], phi[(int)(0.5*N-1)][(int)(0.25*N+1)]);\n", " }\n", " n_steps++;\n", " }\n", " \n", " // Write the final condition out to disk and terminate.\n", " printf(\"Terminated after %d steps.\\n\", n_steps);\n", " \n", " FILE* f;\n", " f = fopen(\"./data.txt\", \"w\"); // wb -write binary\n", " if (f != NULL) {\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " fprintf(f, \"%f\\t\", phi[i][j]);\n", " }\n", " fprintf(f, \"\\n\");\n", " }\n", " fclose(f);\n", " } else {\n", " //failed to create the file\n", " }\n", "\n", " return 0;\n", "}\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "!mpicc -std=c99 -o fd_charge fd_charge.c\n", "!./fd_charge 200 80000 1000" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Use Python to visualize the result quickly.\n", "import numpy as np\n", "import matplotlib as mpl\n", "import matplotlib.pyplot as plt\n", "from matplotlib import cm\n", "%matplotlib inline\n", "#mpl.rcParams['figure.figsize']=[20,20]\n", "\n", "data = np.loadtxt('./data.txt')\n", "grdt = np.gradient(data);\n", "mx = 10**np.floor(np.log10(data.max()))\n", "\n", "fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20,20))\n", "axes[0].imshow(data, cmap=cm.seismic, vmin=-mx, vmax=mx, extent=[-1,1,-1,1])\n", "axes[1].imshow(data, cmap=cm.seismic, vmin=-mx, vmax=mx)\n", "#axes[1].quiver(grdt[0].T, grdt[1].T, scale=5e8) #only good for N < 50\n", "axes[1].contour(data, cmap=cm.bwr, vmin=-mx, vmax=mx, levels=np.arange(-mx,mx+1,(2*mx)/1e2))\n", "fig.show()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So for a quick and dirty job (and no validation or verification of the numerics), that serial code will do. But you can already notice significant slowdown towards convergence even at 400\u00d7400 resolution (0.025 length units). To handle bigger problems, it will be necessary to parallelize.\n", "\n", "**Parallel Code**\n", "\n", "We will restrict the domain decomposition to square numbers of processes for the sake of pedagogy, although you likely wouldn't be this restrictive in a real code. Each processor will be responsible for calculating the electrostatics on its segment of the domain, and will not actually have access to the values calculated by the other processes unless they are explicitly communicated.\n", "\n", "\n", "\n", "\n", "\n", "\n", "

One decomposition of a domain by area and processor.
This was used in the code below.

An alternative. Although we opted for the left-hand setting
in the code below, this one is probably easier all around.

\n", "\n", "Why would we need to communicate any values? Well, in this case, parallelization is nontrivial: the finite difference algorithm is nearest-neighbor based, so with a spatial decomposition of the domain we will still need to communicate boundary cells to neighboring processes. Thus we will require _ghost cells_, which refer to data which are not located on this processor natively but are retrieved and used in calculations on the boundary of the local domain.\n", "\n", "![](./img/ghost-cells.png)\n", "\n", "We can set these up either by having separate arrays for the ghost cells (which leads to clean messaging but messy numerical code) or having integrated arrays in the main `phi` array (which leads to nasty messaging code but nicer numerics). We will use clean messaging to clarify the message passing aspect at the cost of obfuscating the numerics a bit with nested ternary cases." ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file fd_charge.c\n", "// This is a parallel version of the code.\n", "#include \n", "#include \n", "#include \n", "#include //for memcpy()\n", "#include \n", "#include \n", "\n", "#define BOUNDS 1.0\n", "\n", "double uniform_rng() { return (double)rand() / (double)RAND_MAX; } // Not good but not the point of this exercise.\n", "struct point_t { int x, y; double mag; };\n", "\n", "int main(int argc, char *argv[]) {\n", " int size = 10;\n", " if (argc > 1) size = atoi(argv[1]);\n", " int iter = 10;\n", " if (argc > 2) iter = atoi(argv[2]);\n", " int out_step = 1000;\n", " if (argc > 3) out_step = atoi(argv[3]);\n", " double eps = 8.8541878176e-12; // permittivity of the material\n", " \n", " int rank_id, ierr, num_ranks;\n", " MPI_Status status;\n", " MPI_Request request;\n", " \n", " ierr = MPI_Init(&argc, &argv);\n", " ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);\n", " ierr = MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);\n", " srand(time(NULL) + rank_id);\n", " \n", " // We will restrict partitions of the domain (and thus processes) to square numbers for the sake of pedagogy.\n", " // Not fast, not elegant, but we only do it once.\n", " if (abs(sqrt((float)num_ranks)-(int)sqrt((float)num_ranks)) > 1e-6) {\n", " printf(\"Expected square number for number of processes; actual number %f.\\n\", (float)abs((float)sqrt((float)num_ranks)));\n", " MPI_Abort(MPI_COMM_WORLD, MPI_ERR_OTHER);\n", " return 0;\n", " }\n", " \n", " // Calculate the processor grid preparatory to the domain partition. We'll make the assignment into a\n", " // sqrt(N) x sqrt(N) array. This allows easy determination of neighboring processes.\n", " int base = (int)abs(sqrt(num_ranks));\n", " int nbr_l = rank_id % base - 1 >= 0 ? rank_id - 1 : -1; //printf(\"%d: nbr_l = %d\\n\", rank_id, nbr_l);\n", " int nbr_r = rank_id % base + 1 < base ? rank_id + 1 : -1; //printf(\"%d: nbr_r = %d\\n\", rank_id, nbr_r);\n", " int nbr_t = rank_id + base < num_ranks? rank_id + base : -1; //printf(\"%d: nbr_t = %d\\n\", rank_id, nbr_t);\n", " int nbr_b = rank_id - base >= 0 ? rank_id - base : -1; //printf(\"%d: nbr_b = %d\\n\", rank_id, nbr_b);\n", " \n", " // Initialize the _local_ grid with an initial guess of zeros as well as the coordinates. We will keep each\n", " // local domain the same size as the original domain, 2x2 in coordinates, and just translate them to make a\n", " // larger domain---in this case, adding processes increases the domain size rather than the mesh resolution.\n", " double phi[size][size]; // C99 Variable Length Arrays; compile with `-std=c99` or else use `malloc`.\n", " double oldphi[size][size];\n", " double x[size],\n", " y[size];\n", " double dx = (2*BOUNDS)/size,\n", " dy = dx;\n", " double base_x = 2*BOUNDS*(rank_id % base) - BOUNDS*base,\n", " base_y = 2*BOUNDS*(rank_id - (rank_id % base)) / base - BOUNDS*base;\n", " for (int i = 0; i < size; i++) {\n", " x[i] = (double)i*dx + base_x;\n", " for (int j = 0; j < size; j++) {\n", " if (i == 0) y[j] = (double)j*dy + base_y;\n", " phi[i][j] = 0.0;\n", " }\n", " }\n", " double phi_l_in[size], // ghost cell messaging arrays\n", " phi_r_in[size],\n", " phi_t_in[size],\n", " phi_b_in[size],\n", " phi_l_out[size],\n", " phi_r_out[size],\n", " phi_t_out[size],\n", " phi_b_out[size];\n", " for (int i = 0; i < size; i++) {\n", " phi_l_in[i] = 0.0;\n", " phi_r_in[i] = 0.0;\n", " phi_t_in[i] = 0.0;\n", " phi_b_in[i] = 0.0;\n", " }\n", " //printf(\"%d: (%f,%f)-(%f,%f)\\n\", rank_id, base_x, base_y, x[size-1], y[size-1]);\n", " \n", " // Set up a few random points and magnitudes for the electrostatics problem.\n", " int K = 80;\n", " struct point_t pt_srcs[K];\n", " for (int k = 0; k < K; k++) {\n", " (uniform_rng()); // proof that the RNG is bad: without this the first number is the same on every thread.\n", " int x_rn = (int)(uniform_rng() * size);\n", " int y_rn = (int)(uniform_rng() * size);\n", " pt_srcs[k].x = x_rn;\n", " pt_srcs[k].y = y_rn;\n", " pt_srcs[k].mag = uniform_rng() * 2.0 - 1.0;\n", " //printf(\"%d: [%d, %d] (%f, %f) @ %f\\n\", rank_id, x_rn, y_rn, x[pt_srcs[k].x], y[pt_srcs[k].y], pt_srcs[k].mag);\n", " }\n", " \n", " // Iterate forward.\n", " int n_steps = 0; // total number of steps iterated\n", " double inveps = 1.0 / eps; // saves a division every iteration over the square loop\n", " double pt_src = 0.0; // accumulator for whether a point source is located at a specific (i,j) index site\n", " while (n_steps < iter) {\n", " memcpy(oldphi, phi, size*size); // Back up the old matrix for residual comparison, if desired.\n", " // Propagate the matrix equation forward towards a solution.\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " // Calculate point source contributions.\n", " pt_src = 0;\n", " for (int k = 0; k < K; k++) {\n", " pt_src = pt_src + ((pt_srcs[k].x == i && pt_srcs[k].y == j) ? pt_srcs[k].mag : 0.0);\n", " }\n", " phi[i][j] = 0.25*dx*dx * pt_src * inveps\n", " + 0.25*(i == 0 ? (nbr_l < 0 ? 0.0 : phi_l_in[j]) : phi[i-1][j])\n", " + 0.25*(i == size-1 ? (nbr_r < 0 ? 0.0 : phi_r_in[j]) : phi[i+1][j])\n", " + 0.25*(j == 0 ? (nbr_b < 0 ? 0.0 : phi_b_in[i]) : phi[i][j-1])\n", " + 0.25*(j == size-1 ? (nbr_t < 0 ? 0.0 : phi_t_in[i]) : phi[i][j+1]);\n", " }\n", " }\n", " \n", " // Communicate the border cell information to neighboring processes. We will alternate odd and even processes,\n", " // although there are more sophisticated ways to do this with nonblocking message passing. Why do we alternate?\n", " for (int i = 0; i < size; i++) {\n", " phi_l_out[i] = phi[0][i];\n", " phi_r_out[i] = phi[size-1][i];\n", " phi_t_out[i] = phi[i][size-1];\n", " phi_b_out[i] = phi[i][0];\n", " }\n", " // Pass data left.\n", " if (nbr_l >= 0) MPI_Isend(phi_l_out, size, MPI_DOUBLE, nbr_l, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_r >= 0) MPI_Irecv(phi_r_in, size, MPI_DOUBLE, nbr_r, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " \n", " // Pass data right.\n", " if (nbr_r >= 0) MPI_Isend(phi_r_out, size, MPI_DOUBLE, nbr_r, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_l >= 0) MPI_Irecv(phi_l_in, size, MPI_DOUBLE, nbr_l, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " \n", " // Pass data up.\n", " if (nbr_t >= 0) MPI_Isend(phi_t_out, size, MPI_DOUBLE, nbr_t, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_b >= 0) MPI_Irecv(phi_b_in, size, MPI_DOUBLE, nbr_b, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " \n", " // Pass data down.\n", " if (nbr_b >= 0) MPI_Isend(phi_b_out, size, MPI_DOUBLE, nbr_b, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_t >= 0) MPI_Irecv(phi_t_in, size, MPI_DOUBLE, nbr_t, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " MPI_Barrier(MPI_COMM_WORLD);\n", " \n", " // Output information periodically.\n", " if (rank_id == 0 && n_steps % out_step == 0) {\n", " printf(\"Iteration #%d:\\n\", n_steps);\n", " printf(\"\\tphi(%6.3f, %6.3f) = %24.20f\\n\", x[(int)(0.5*size-1)], y[(int)(0.25*size+1)], phi[(int)(0.5*size-1)][(int)(0.25*size+1)]);\n", " }\n", " n_steps++;\n", " }\n", " if (rank_id == 0) {\n", " printf(\"Terminated after %d steps.\\n\", n_steps);\n", " printf(\"\\tphi(%6.3f, %6.3f) = %24.20f\\n\", x[(int)(0.5*size-1)], y[(int)(0.25*size+1)], phi[(int)(0.5*size-1)][(int)(0.25*size+1)]);\n", " }\n", " \n", " // Write the final condition out to disk and terminate.\n", " // Parallel I/O, while supported by MPI, is a whole other ball game and we won't go into here.\n", " // Thus we will gather all of the data to process rank 0 and output it from there.\n", " double phi_all[size*base*size*base];\n", " // This version of the code ^ incorrectly orders the processor output. We actually have to transpose each process's\n", " // data into a column-major format to get everything aligned properly.\n", " double phi_temp[size][size];\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " phi_temp[j][i] = phi[i][j];\n", " }\n", " }\n", " MPI_Gather(&phi_temp[0][0], size*size, MPI_DOUBLE, &phi_all[0], size*size, MPI_DOUBLE, 0, MPI_COMM_WORLD);\n", " // At this point, the data are in the right array, but have an odd stride and relative order. We'll fix that now.\n", " double phi_new[size*base][size*base];\n", " int offset_x, offset_y;\n", " for (int p = 0; p < base*base; p++) { // source process for data\n", " offset_x = (p-p%base)/base;\n", " offset_y = (p%base);\n", " for (int i = 0; i < size; i++) { // x-index of process data\n", " for (int j = 0; j < size; j++) { // y-index of process data\n", " phi_new[offset_y*size+j][offset_x*size+i] = phi_all[p*size*size+i*size+j];\n", " }\n", " }\n", " }\n", " \n", " if (rank_id == 0) {\n", " FILE* f;\n", " char filename[16];\n", " f = fopen(\"data.txt\", \"w\"); // wb -write binary\n", " if (f != NULL) {\n", " for (int i = 0; i < base*size; i++) {\n", " for (int j = 0; j < base*size; j++) {\n", " fprintf(f, \"%f\\t\", phi_new[i][j]);\n", " }\n", " fprintf(f, \"\\n\");\n", " }\n", " fclose(f);\n", " } else {\n", " //failed to create the file\n", " }\n", " }\n", " \n", " MPI_Finalize();\n", " return 0;\n", "}\n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "!mpicc -std=c99 -o fd_charge fd_charge.c\n", "!mpiexec -np 9 ./fd_charge 100 100001 5000" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Use Python to visualize the result quickly.\n", "import numpy as np\n", "import matplotlib as mpl\n", "import matplotlib.pyplot as plt\n", "from matplotlib import cm\n", "%matplotlib inline\n", "\n", "data = np.loadtxt('./data.txt')\n", "mx = 10**np.floor(np.log10(data.max()))\n", "\n", "fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(16,16))\n", "axes.imshow(data, cmap=cm.seismic, vmin=-mx, vmax=mx)\n", "fig.show()\n", "\n", "fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(16,16))\n", "axes.contour(data[::-1,:], cmap=cm.bwr, vmin=-mx, vmax=mx, levels=np.arange(-mx,mx+1,(2*mx)/1e2))\n", "fig.show()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "- What happens if I neglect to include the following conditional around the file output?\n", "\n", "\n", " if (rank_id == 0) { ... }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Domains naturally do not overlap in this case:\n", "\n", " 0: (-2.00,-2.00)-(-0.01,-0.01)\n", " 1: ( 0.00,-2.00)-( 1.99,-0.01)\n", " 2: (-2.00, 0.00)-(-0.01, 1.99)\n", " 3: ( 0.00, 0.00)-( 1.99, 1.99)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Message Passing: `MPI_Isend`, `MPI_Irecv`\n", "\n", "The vanilla MPI send and receive operations are _blocking_, meaning that when you send a message the process waits at that point until an acknowledgment of receipt is sent by the receiving process. As you can imagine, this both slows down your code and makes even simple messaging schemes cumbersome (what happens if you tell every process to send at the same time and receive at the next function call?). In order to prevent this kind of deadlock, we use _nonblocking_ communications, which do not block execution\u2014they just fire their message into the ether and proceed ahead.\n", "\n", "As before, there are a lot of variants, but the basic two are `MPI_Isend` and `MPI_Irecv`.\n", "\n", " int MPI_Isend(\n", " void* message, // Pointer to data to send\n", " int count, // Number of data values to send\n", " MPI_Datatype datatype, // Type of data (e.g. MPI_INT)\n", " int destination_rank, // Rank of process to receive message\n", " int tag, // Identifies message type\n", " MPI_Comm comm, // Use MPI_COMM_WORLD\n", " MPI_Request* request // To query about the status of the message\n", " )\n", " \n", " int MPI_Irecv(\n", " void* message, // Points to location in memory where\n", " // received message is to be stored.\n", " int count, // MAX number of data values to accept \n", " MPI_Datatype datatype // Type of data (e.g. MPI_INT) \n", " int source_rank, // Rank of process to receive from\n", " // (Use MPI_ANY_SOURCE to accept\n", " // from any sender)\n", " int tag, // Type of message to receive\n", " // (Use MPI_ANY_TAG to accept any type)\n", " MPI_Comm comm, // Use MPI_COMM_WORLD\n", " MPI_Request* request // To query about the status of the message\n", " )\n", "\n", "There's a lot of information there, and none of it optional. Let's unpack the arguments a little more with a trivial example that implements the previous dart-throwing example." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Collective Operations: `MPI_Barrier`, `MPI_Gather`\n", "\n", "Now, with nonblocking operations we can easily run into a situation where we need to know that a certain message has already been received in order to coherently progress: the array needs to be updated, whatever. In that case, `MPI_Barrier` defines a point at which all processes will wait until they are synchronized with each other again. They slow code down, however, and so they should be used sparingly.\n", "\n", "![](./img/MPI-Barrier.gif)\n", "\n", " int MPI_Barrier(\n", " MPI_Comm comm // Use MPI_COMM_WORLD \n", " )\n", "\n", "### Exercise\n", "\n", "- What is the difference in the following two code snippets?\n", "\n", "\n", " // Pass data left.\n", " if (nbr_l >= 0) MPI_Isend(phi_l_out, size, MPI_DOUBLE, nbr_l, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_r >= 0) MPI_Irecv(phi_r_in, size, MPI_DOUBLE, nbr_r, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " if (nbr_l >= 0 && n_steps == 1000) printf(\"%d: %d sending left %f to %d\\n\", n_steps, rank_id, phi_l_out[5], nbr_l);\n", " if (nbr_r >= 0 && n_steps == 1000) printf(\"%d: %d recving right %f of %d\\n\", n_steps, rank_id, phi_r_in[5], nbr_r);\n", " MPI_Barrier(MPI_COMM_WORLD);\n", "\n", " // Pass data left.\n", " if (nbr_l >= 0) MPI_Isend(phi_l_out, size, MPI_DOUBLE, nbr_l, 0, MPI_COMM_WORLD, &request);\n", " if (nbr_r >= 0) MPI_Irecv(phi_r_in, size, MPI_DOUBLE, nbr_r, MPI_ANY_TAG, MPI_COMM_WORLD, &request);\n", " MPI_Barrier(MPI_COMM_WORLD);\n", " if (nbr_l >= 0 && n_steps == 1000) printf(\"%d: %d sending left %f to %d\\n\", n_steps, rank_id, phi_l_out[5], nbr_l);\n", " if (nbr_r >= 0 && n_steps == 1000) printf(\"%d: %d recving right %f of %d\\n\", n_steps, rank_id, phi_r_in[5], nbr_r);\n", "\n", "\n", "### `MPI_Gather`\n", "\n", "Previously, we examined the function `MPI_Reduce`, which took data from every process, transformed them by some operation like addition or multiplication, and placed the result in a variable on `root`. In the case above, we needed to gather data from every process _without_ transforming the data or collapsing it from an array to a single value.\n", "\n", "`MPI_Gather` takes arrays from every process and combines them (by rank) into a new array on `root`. The new array, as we saw above, is ordered by rank, meaning that if a different topology is in use you have to correct the data (which we did).\n", "\n", "![](./img/MPI-Gather-vec.png)\n", "\n", " int MPI_Gather(\n", " void* sendbuf, // Starting address of send buffer\n", " int sendcnt, // Number of elements in send buffer\n", " MPI_Datatype datatype, // Type of data (e.g. MPI_INT) \n", " void* recvbuf, // Starting address of receive buffer\n", " int recvcnt, // Number of elements for any single receive\n", " MPI_Datatype recvtype, // Type of data (e.g. MPI_INT) \n", " int root, // Process that receives the answer \n", " MPI_Comm comm // Use MPI_COMM_WORLD \n", " )\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, MPI is a _lot_ more work, but you gain the capability of scaling to a reasonably large number of processors.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Scaling\n", "\n", "_Scaling_ refers to how efficiently your code retains performance as both the problem size $N$ and the number of processes $P$ increase. It is typically divided into _weak_ and _strong_ scaling.\n", "\n", "**Strong Scaling**: How does the performance behave as the number of processors $P$ increases for a fixed total problem size $N$?\n", "\n", "This is what most people think of as scaling, and describes the overall performance of a code as a problem is subdivided into smaller and smaller pieces between processes. To test this, select a problem size suitable to scaling to a large number of processes (for instance, divisible by 64), and simply call larger and larger jobs while outputting timing information.\n", "\n", "**Weak Scaling**: How does the performance behave as the number of processors $P$ varies with each processor handling a fixed process size $N_P = N/P = \\text{const}$?\n", "\n", "In this case (which is most appropriate for $O(N)$ algorithms but can be used for any system), we observe more about the relative overhead incurred by including additional machines in the problem solution. Weak scaling can be tested in a straightforward manner by keeping track of the pieces of the puzzle each process is responsible for. With strongly decomposable systems, like molecular dynamics or PDE solution on meshes, weak scaling can be revealing. However, in my experience there are many types of problems for which solving for $N$ and solving for $N+1$ are radically different problems, such as in density functional theory where the addition of an electron means the solution of a fundamentally different physical system.\n", "\n", "([ref](https://web.archive.org/web/20140307224104/http://www.stfc.ac.uk/cse/25052.aspx))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Memory and C99 Variable-Length Arrays\n", "\n", "Are the following two segments of code equivalent in C99?\n", "\n", "**Snippet 1**\n", "\n", " int m = 5, n = 10, p = 7;\n", " double array[m][n][p];\n", " \n", "**Snippet 2**\n", "\n", " double ***array;\n", " array = (double***) malloc(m*sizeof(double**));\n", " for (i = 0; i < m; i++) {\n", " array[i] = (double**) malloc(n*sizeof(double*));\n", " for (j = 0; j < n; j++) {\n", " array[i][j] = (double*) malloc(p*sizeof(double));\n", " }\n", " }\n", "\n", "They _aren't_\u2014but you may not be sure why. The two snippets are _not_ equivalent from a memory standpoint because of where they allocate `array`: the first allocates `array` on the stack, the second on the heap.\n", "\n", "Application memory is conventionally divided into two regions: the _stack_ and the _heap_. For many applications it doesn't really matter which you use, but when you start defining very large arrays of data or using multiple threads in parallel then it can become critical to manage your memory well. ([ref1](https://stackoverflow.com/questions/79923/what-and-where-are-the-stack-and-heap)) ([ref2](https://stackoverflow.com/questions/22555639/mpi-communicate-large-two-dimensional-arrays))\n", "\n", "- **Stack** memory is specific to a thread, and is generally fixed in size. It is often faster to allocate in, and easier to read the allocation code.\n", "\n", "- **Heap** memory is shared among all threads, and grows as demand requires.\n", "\n", "C99 variable-length arrays (VLAs) allocate on the stack. Since you can overflow the stack, VLAs are probably not a good idea for serious numeric code. They do make code so much more readable that I opted for them in this lesson.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Best Practices for Scientific Parallel Programming\n", "\n", "Now this may be kind of a funny aside, but it is a concern I often had as a student. MPI owes a lot to the local computer science and parallel programming folks on campus, so is it just a tool we prefer because it is \"in-house\"? There are tools that get talked about a lot, as well as advanced research projects that offer interesting features but are not widely adopted, like [CHARM++](http://charm.cs.uiuc.edu/) or [UPC](http://upc.lbl.gov/). No, MPI is used everywhere and really is the standard for supercomputing today.\n", "\n", "### The Process of Creating a Parallel Program\n", "\n", "**Matrix Relaxation**\n", "\n", "The best parallel algorithm is oten not the best sequential algorithm. For instance, in matrix solution, we can select from Jacobi relaxation or Gauss\u2013Seidel relaxation. [Jacobi relaxation](https://en.wikipedia.org/wiki/Jacobi_method) is a na\u00efve algorithm to solve a matrix equation iteratively by inverting only the diagonal elements and modifying an initial guess progressively until a solution within certain tolerances is found. [Gauss\u2013Seidel relaxation](https://en.wikipedia.org/wiki/Gauss%E2%80%93Seidel_method) is formally a better algorithm, since it also takes advantage of updated data points from _the current iteration_ to improve the subsequent data points in-flight as well. However, Jacobi relaxation, relying exclusively on a prior iteration, can be sensibly implemented in parallel while Gauss\u2013Seidel iteration can not.\n", "\n", "**Prefix Summation**\n", "\n", "Consider also summing vector elements of $A$ such that each resulting element $B_i$ is the sum over all elements prior to index $i$:\n", "$$\\begin{eqnarray} B_{0} = & A_{0} \\\\ B_i = & \\sum_{j = 1}^i A_j \\text{.} \\end{eqnarray}$$\n", "\n", "In serial, this algorithm requires $N = P$ stages. In parallel, this can be accomplished with a _scan_ operation in $\\log P$ stages. ([ref1](https://en.wikipedia.org/wiki/Prefix_sum)) ([ref2](http://http.developer.nvidia.com/GPUGems3/gpugems3_ch39.html))\n", "\n", "\n", "\n", "\n", "\n", "\n", "

Serial Version

Parallel Version

\n", "\n", "**Basic Process**\n", "\n", "- Determine algorithms\n", "- Decide on decomposition of computation into work and data units\n", "- Decide on assignment of these units to processors\n", "- Express decisions in language available for the machine\n", "\n", "**Tips**\n", "\n", "- Architect a new parallel top-level structure rather than parallelizing a sequential program\n", "- Insert existing sequential code where appropriate\n", "- Refactor a portion of the sequential code into a parallel structure\n", "- Rewrite the remainder\n", "\n", "\n", "\n", "- Express algorithms in terms of $N$ and $P$.\n", "- Visualize data flow, not control flow, to reveal dependencies." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Exercise\n", "- Implement the parallel prefix sum algorithm depicted graphically above. Feel free to copy code snippets from earlier to structure your code and then make it work." ] }, { "cell_type": "code", "collapsed": false, "input": [ "%%file parprefix.c\n", "// your code here" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Writing Scientific Code\n", "\n", "What else is (or should be) standard?\n", "\n", "_(Suggestions aggregated from Jay Alameda, Mark van Moers, and Neal Davis.)_\n", "\n", "- Take the time to learn your development environment of choice well\u2014there are language-aware code editors including code completion: IDEs (Eclipse, XCode), `vi` (YouCompleteMe), `emacs`.\n", "\n", "- Write self-documenting code: good variable and function names that follow a convention.\n", "\n", "- Modularize your code as much as practical.\n", "\n", "- Always include certain standard files when distributing your software: `README`, `INSTALL`, `CITATION`, `LICENSE`.\n", "\n", "- Use a compilation system: GNU Autotools (`./configure; make; make install`), CMake, etc. (This also lets you query the hardware to optimize at compile time.)\n", "\n", "- Use version control: Git, SVN, Mercurial, etc.\n", "\n", "- Use an issue-tracking system: GitHub issues, Bugzilla, etc.\n", "\n", "- Develop and use a unit-testing scheme (even a simple one).\n", "\n", "- Don't reinvent the wheel\u2014find existing libraries: Boost, SciPy, Netlib, Scalapack, PETSc, etc.\n", "\n", "- Use configuration files and command-line flags rather than hard-coding values.\n", "\n", "- Compile with all debug flags set (`-Wall -Werror -g`), and aim to compile with zero warnings.\n", "\n", "- Don't create new file formats: find something existing that works (`HDF5`, `PDB`, `XYZ`, etc.).\n", "\n", "- Have a long-term data management plan (now a requirement for NSF, other grants).\n", "\n", "- Carry out studies of the strong and weak scaling, as well as profiling so that you aren't wasting cluster cycles.\n", "\n", "### Debugging Scientific Code\n", "\n", "[![](./img/xkcd-future_self.png)](http://xkcd.com/)\n", "\n", "Now we come to one of the horrible truths about life and scientific computing: working code isn't necessarily correct code. Debugging is a much larger subject than we will be able to satisfactorily cover today, but I would like to give a ten-thousand-foot view of the subject as it applies to scientific computing.\n", "\n", "There are a few general maxims I've picked up from various sources over the years:\n", "\n", "- Nobody gets it right the first time, [including you]. (Lynne Williams)\n", "\n", "- Premature optimization is the root of all evil. (Donald Knuth)\n", "\n", "- Confusion is a clue that something is wrong\u2014don't ignore it. (Eliezer Yudkowsky paraphrase)\n", "\n", "- Checkpoint often.\n", "\n", "First, learn to use debugging and profiling tools. Two that are supported on Campus Cluster are `gdb` and `valgrind`. `gdb`, the GNU Debugger, is designed to work with many programming languages besides C, but it can be painful to get it to work with a parallel program. Actually, any parallel debugging is painful.\n", "\n", "The other tool, `valgrind`, monitors memory behavior. It can detect cache misses, memory leaks, and other problems which can lead to poor code performance and excessive memory demand.\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Resources\n", "\n", "- [DeinoMPI documentation](http://mpi.deino.net/mpi_functions/). The most complete and thorough documentation (with examples) of MPI-2 functions.\n", "\n", "- [UMinn tutorial](http://static.msi.umn.edu/tutorial/scicomp/general/MPI/content1.html)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Appendix\n", "\n", "Code snippets for reference in teaching." ] }, { "cell_type": "code", "collapsed": false, "input": [ "// THIS CODE OUTPUTS THE RESULTS SERIALLY TO DISK FROM EACH PROCESS.\n", " // Write the final condition out to disk and terminate.\n", " // Parallel I/O, while supported by MPI, is a whole other ball game and we won't go into here.\n", " // Thus we will gather all of the data to process rank 0 and output it from there.\n", " double phi_all[size*base][size*base];\n", " //MPI_Send(&phi[0][0], base*N*base*N, MPI_DOUBLE\n", " \n", " FILE* f;\n", " char filename[16];\n", " sprintf(filename, \"./data-%d.txt\", rank_id);\n", " f = fopen(filename, \"w\"); // wb -write binary\n", " if (f != NULL) {\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " fprintf(f, \"%f\\t\", phi[i][j]);\n", " }\n", " fprintf(f, \"\\n\");\n", " }\n", " fclose(f);\n", " } else {\n", " //failed to create the file\n", " }\n", " " ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "# Use Python to visualize the result quickly.\n", "import numpy as np\n", "import matplotlib as mpl\n", "import matplotlib.pyplot as plt\n", "from matplotlib import cm\n", "%matplotlib inline\n", "#mpl.rcParams['figure.figsize']=[20,20]\n", "\n", "N = 9\n", "data = []\n", "grdt = []\n", "for i in range(0, N):\n", " data.append(np.loadtxt('./data-%d.txt'%i))\n", " grdt.append(np.gradient(data[-1]))\n", "\n", "fig, axes = plt.subplots(nrows=(int)(np.sqrt(N)), ncols=(int)(np.sqrt(N)), figsize=(12,12))\n", "for i in range(0, (int)(np.sqrt(N))):\n", " for j in range(0, (int)(np.sqrt(N))):\n", " axes[i][j].imshow(data[i*(int)(np.sqrt(N))+j].T, cmap=cm.seismic, vmin=-3.5e6, vmax=3.5e6, extent=[-1,1,-1,1])\n", " #axes[i][j].contour(data[i*(int)(np.sqrt(N))+j].T, cmap=cm.bwr, vmin=-1.5e6, vmax=1.5e6, levels=np.arange(-1e7,1e7+1,2e5))\n", "fig.show()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This code doesn't correctly handle the merge of data, instead spreading it out across segments twice as long and half as tall as the array.\n", "\n", " // Write the final condition out to disk and terminate.\n", " // Parallel I/O, while supported by MPI, is a whole other ball game and we won't go into here.\n", " // Thus we will gather all of the data to process rank 0 and output it from there.\n", " double phi_all[size*base][size*base];\n", " //MPI_Gather(&phi[0][0], size*size, MPI_DOUBLE, &phi_all[0][0], size*size, MPI_DOUBLE, 0, MPI_COMM_WORLD);\n", " // This version of the code ^ incorrectly orders the processor output. We actually have to transpose each process's\n", " // data into a column-major format to get everything aligned properly.\n", " double phi_new[size][size];\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " phi_new[j][i] = phi[i][j];\n", " }\n", " }\n", " if (rank_id==0) { printf(\"%d\\n\", rank_id);\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " printf(\"%f\\t\", phi[i][j]);\n", " }\n", " printf(\"\\n\");\n", " }}MPI_Barrier(MPI_COMM_WORLD);\n", " if (rank_id==1) { printf(\"%d\\n\", rank_id);\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " printf(\"%f\\t\", phi[i][j]);\n", " }\n", " printf(\"\\n\");\n", " }}MPI_Barrier(MPI_COMM_WORLD);\n", " if (rank_id==2) { printf(\"%d\\n\", rank_id);\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " printf(\"%f\\t\", phi[i][j]);\n", " }\n", " printf(\"\\n\");\n", " }}MPI_Barrier(MPI_COMM_WORLD);\n", " if (rank_id==3) { printf(\"%d\\n\", rank_id);\n", " for (int i = 0; i < size; i++) {\n", " for (int j = 0; j < size; j++) {\n", " printf(\"%f\\t\", phi[i][j]);\n", " }\n", " printf(\"\\n\");\n", " }}MPI_Barrier(MPI_COMM_WORLD);\n", " MPI_Gather(&phi[0][0], size*size, MPI_DOUBLE, &phi_all[0][0], size*size, MPI_DOUBLE, 0, MPI_COMM_WORLD);\n", " printf(\"\\n\\n\\n\");\n", " if (rank_id == 0) {\n", " FILE* f;\n", " char filename[16];\n", " f = fopen(\"data.txt\", \"w\"); // wb -write binary\n", " if (f != NULL) {\n", " for (int i = 0; i < base*size; i++) {\n", " for (int j = 0; j < base*size; j++) {\n", " fprintf(f, \"%f\\t\", phi_all[i][j]);\n", " printf(\"%f\\t\", phi_all[i][j]);\n", " }\n", " fprintf(f, \"\\n\");\n", " printf(\"\\n\");\n", " }\n", " fclose(f);\n", " } else {\n", " //failed to create the file\n", " }\n", " }\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![](./img/fd-smear.png)\n", "![](./img/fd-separate.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## List of MPI Functions\n", "**Initialization & Cleanup**\n", "\n", "`MPI_Init`\n", "\n", "`MPI_Finalize`\n", "\n", "`MPI_Abort`\n", "\n", "`MPI_Comm_rank`\n", "\n", "`MPI_Comm_size`\n", "\n", "`MPI_COMM_WORLD`\n", "\n", "`MPI_Wtime`\n", "\n", "**Message Passing**\n", "\n", "`MPI_Status`\n", "\n", "`MPI_Probe`\n", "\n", "**Point-to-Point**\n", "\n", "We didn't discuss the mysterious \"other\" ways of sending messages mentioned earlier. These are they\u2014other modes include explicit buffering (`B`), ready sending (so a matching receive must have already posted) (`R`), and synchronous sending (best for efficiency) (`S`). ([ref](http://www.mcs.anl.gov/research/projects/mpi/sendmode.html))\n", "\n", "`MPI_Send`\n", "\n", "`MPI_Bsend`\n", "\n", "`MPI_Ssend`\n", "\n", "`MPI_Rsend`\n", "\n", "`MPI_Isend`\n", "\n", "`MPI_Ibsend`\n", "\n", "`MPI_Irsend`\n", "\n", "\n", "\n", "`MPI_Recv`\n", "\n", "`MPI_Irecv`\n", "\n", "\n", "\n", "`MPI_Sendrecv`\n", "\n", "**Collective**\n", "\n", "`MPI_Bcast`\n", "\n", "`MPI_Gather`\n", "\n", "`MPI_Scatter`\n", "\n", "`MPI_Allgather x{v}`\n", "\n", "`MPI_Allreduce`\n", "\n", "`MPI_Alltoall x{w,v}`\n", "\n", "x`MPI_Reduce_scatter`\n", "\n", "x`MPI_Scan`\n", "\n", "\n", "\n", "`MPI_Barrier`\n", "\n", "**Advanced**\n", "\n", "*Derived Datatypes and Operations*\n", "\n", "`MPI_Pack`\n", "\n", "`MPI_Type_vector`\n", "\n", "`MPI_Type_contiguous`\n", "\n", "`MPI_Type_commit`\n", "\n", "`MPI_Type_free`\n", "\n", "\n", "\n", "`MPI_Op_create`\n", "\n", "`MPI_User_function`\n", "\n", "`MPI_Op_free`\n", "\n", "Robey\u2019s Kahan sum, http://www.sciencedirect.com/science/article/pii/S0167819111000238\n", "\n", "\n", "\n", "`MPI_Comm_create`\n", "\n", "`MPI_Scan`\n", "\n", "`MPI_Exscan`\n", "\n", "`MPI_Comm_free`\n", "\n", "**I/O**\n", "\n", "`MPI_File_*`\n", "\n", "**One-Sided Communication**\n", "MPI_Put write to remote memory\n", "MPI_Get read from remote memory\n", "MPI_Accumulate reduction op on same memory across multiple tasks" ] }, { "cell_type": "code", "collapsed": false, "input": [], "language": "python", "metadata": {}, "outputs": [] } ], "metadata": {} } ] }