{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel Computing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Parallel computing is a programming method that **harnesses the power of multiple processors (or cores) at once**. Once of concern only to programmers of large supercomputers, modern computers now almost always have multi-core processors. However:\n", "\n", "> At the heart of efficient parallel code is fast serial code!!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### How many CPU cores do I have?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using Hwloc\n", "Hwloc.num_physical_cores()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "(Note that `Sys.CPU_THREADS` may or may not be equal to the number above. It indicates the number of CPUs + Hyperthreads.)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Why go parallel?\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### **Amdahl's Law**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Naive expectation: I have 4 cores, give me my 4x speedup!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ ">If $p$ is the fraction of a code that can be parallelized than the maximal theoretical speedup by parallelizing on $n$ cores is given by $F(n) = 1/(1-p + p/n)$." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using Plots\n", "F(p,n) = 1/(1-p + p/n)\n", "\n", "pl = plot()\n", "for p in reverse(sort(vcat(0.2:0.2:1, [0.9, 0.95])))\n", " plot!(pl, n -> F(p,n), 1:16, lab=\"$(Int(p*100))%\", lw=2,\n", " legend=:topleft, xlab=\"number of cores\", ylab=\"parallel speedup\", frame=:box)\n", "end\n", "pl" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel Computing in Julia" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Julia documentation link: [Parallel computing](https://docs.julialang.org/en/v1/manual/parallel-computing/index.html)\n", "\n", "There are many types of parallelism, some of which are (from micro to macro)\n", "\n", "* **Instruction level parallelism**\n", "* **Multi-threading** (process shared memory)\n", "* **Tasks aka Coroutines** aka Green threads (more like cooperative multitasking, process shared memory)\n", "* **Multi-Core processing** (maybe system shared memory)\n", "* **Distributed processing** (same as above but involving multiple machines)\n", "\n", "Julia provides (more or less) native support for all of these forms of parallel processing (same order as above)\n", "\n", "* `@simd` and [SIMD.jl](https://github.com/eschnett/SIMD.jl)\n", "* `Base.Threads.@threads` (experimental since 2015 but seems to be fine)\n", "* `@async`, `@sync`, `Channel`\n", "* `@spawnat`, `@fetch`, `RemoteChannel`, `SharedArray`, etc.\n", "* `@spawnat`, `@fetch`, `RemoteChannel`, `DArray`, `MPI.jl` etc.\n", "\n", "With scientific computing in mind, we will mainly focus on how to distribute a process through multiple cores or machines (our thp cluster for example), that is **Multi-Core processing** and **Distributed processing**. But before we can do so, we have to learn how to control Julia's control flow through tasks." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Tasks (Control flow)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default, Julia waits for every command to finish and run everything sequentially.\n", "\n", "Tasks are a control flow feature that allows computations to be **suspended** and resumed in a flexible manner. This feature is sometimes called by other names, such as coroutines, green or lightweight threads and cooperative multitasking.\n", "\n", "To me, the name **cooperative multitasking** is the most descriptive. Tasks are managed/scheduled by Julia and can sometimes be run in a quasi-parallel fashion.\n", "\n", "An important use case is **asynchronous I/O**, which is typically slow. Examples are\n", " * **multiple user input** (Why not already process some of the input?)\n", " * **data dumping to disk** (Maybe it's possible to continue a calculation?)\n", " * **receiving calculations from worker processes** (We'll need that below!)\n", "\n", "How do we execute commands asynchronously?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## `@async` and `@sync`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "(Based on [this](https://stackoverflow.com/questions/37287020/how-and-when-to-use-async-and-sync-in-julia/37287021#37287021) stackoverflow answer.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "?@async" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What this means is that for whatever falls within its scope, Julia will start a task to then proceed to whatever comes next in the script **without waiting for the task to complete**." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time sleep(2);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time @async sleep(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Julia allows the script to proceed (and the `@time` macro to fully execute) without waiting for the task (in this case, sleeping for two seconds) to complete." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can use the `@sync` macro to synchronize, that is wait for, all encapsulated tasks. (see `?@sync`). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time @sync @async sleep(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Of course, here it doesn't make much sense to write `@sync @async` - we could simply drop it altogether.\n", "\n", "A better example is the following." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time @sync begin\n", " @async sleep(2.0)\n", " @async sleep(2.0)\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@sync begin\n", " @async (sleep(2); println(\"Today is reverse day!\"))\n", " @async (sleep(1); println(\" class!\"))\n", " @async print(\"Hello\")\n", "end;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed processing: Multi-core" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Distributed computing in Julia means having **multiple separate Julia instances running on different cores** on the same or different machines.\n", "\n", "Data movement and communication between processes is explicit.\n", "\n", "Let's focus on the *multi-core* case (your laptop/desktop) and save some cluster fun for later." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Master-worker model\n", "\n", "Julia uses a *master-worker* paradigm for its native distributed parallelism.\n", "\n", "One master process coordinates all the worker processes, which perform the actual computations.\n", "\n", "By default, Julia starts with one process on one core. If this single process is all we have, than it is both the master and the worker." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using Distributed # Loading all tools that we need for distributed computing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nprocs()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nworkers() # the master is considered a worker as long as there are no real workers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To increase the number of workers, i.e. Julia processes, from within a Julia session we can use `addprocs`.\n", "\n", "Alternatively, when starting Julia from the command line, one can use the `-p` option. Example,\n", "\n", "```\n", "julia -p 4\n", "```\n", "\n", "will start Julia with 5 processes, 1 master and 4 workers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs(4) # I have 4 cores, so let's add 4 worker processes." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Every process has a Julia internal `pid` (process id). The master is always 1. You can get the workers pids from `workers()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workers()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the 4 worker's pids aren't necessarily 2, 3, 4 and 5. Let's remove the processes and add them once more." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rmprocs(workers()) # rmprocs(array of pids of worker processes to remove)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nworkers() # only the master is left" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs(4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workers()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## One master to rule them all - `@spawn`, `@spawnat`, `@fetch`, `@fetchfrom`, `@everywhere`..." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To execute commands and start computations on workers we can use the following macros\n", "\n", "* `@spawn`: run a command or a code block on any worker and return a `Future` to it's result. It's basically a version of `@async` for remote processes.\n", "* `@spawnat`: same as `@spawn` but one can choose a specific worker by providing its pid." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Example:** Let's say we would like to generate a random matrix on one of the workers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@spawn rand(2,2) # basically @async for remote process, i.e. returns immediately" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "result = @spawn rand(2,2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fetch(result) # blocks, like @sync" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because the combination of spawning at fetching is so common, there is `@fetch` which combines them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@fetch rand(2,2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Which worker did the work?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@fetch begin\n", " println(myid());\n", " rand(2,2)\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using `@spawnat` and `@fetchfrom` we can delegate the work to a specific worker." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@fetchfrom 7 begin\n", " println(myid());\n", " rand(2,2)\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can use `@sync` as a blocker to wait for all workers to complete their tasks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@sync begin\n", " pids = workers()\n", " @spawnat pids[1] (sleep(2); println(\"Today is reverse day!\"))\n", " @spawnat pids[2] (sleep(1); println(\" class!\"))\n", " @spawnat pids[3] println(\"Hello\")\n", "end;\n", "println(\"Done!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok, now that we understood all that, let's delegate a *complicated* calculation" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "using Random\n", "\n", "function complicated_calculation()\n", " sleep(1) # so complex that it takes a long time :)\n", " randexp(5)\n", "end\n", "\n", "@fetch complicated_calculation()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What happened?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Think of every worker as a separate Julia instance.**\n", "\n", "We only defined `complicated_calculation()` on the master process. The function doesn't exist on any of the workers yet.\n", "\n", "The macro `@everywhere` comes for the rescue." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere begin # execute this block on all workers\n", " using Random\n", " \n", " function complicated_calculation()\n", " sleep(1)\n", " randexp(5) # lives in Random\n", " end\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@fetch complicated_calculation()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data movement" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is a crucial difference between the following two pieces of code. Can you guess what it is?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function method1()\n", " A = rand(100,100)\n", " B = rand(100,100)\n", " C = @fetch A^2 * B^2\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function method2()\n", " C = @fetch rand(100,100)^2 * rand(100,100)^2\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's benchmark them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using BenchmarkTools\n", "@btime method1();\n", "@btime method2();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Method 1 is slower, because `A` and `B` are created on the master process, transferred to a worker, and squared and multiplied on the worker process before the result is finally transferred back to the master.\n", "\n", "Method 2, on the other hand, creates, squares, and multiplies the random matrix all on the work process and only submits the result to the master.\n", "\n", "Hence, `method1` is **transferring 3x as much data** between the master and the worker!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Data movement is crucial!**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this toy example, it's rather easy to identify the faster method.\n", "\n", "In a real program, however, understanding data movement does require more thought and likely some measurement.\n", "\n", "For example, if the first process needs matrix `A` in a follow-up computation then the first method might be better in this case. Or, if computing `A` is expensive and only the current process has it, then moving it to another process might be unavoidable." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Computer latency at a human scale\n", "\n", "To understand why thinking about data is important it's instructive to look at the time scales involved in data access.\n", "\n", "\n", "\n", "(taken from https://www.prowesscorp.com/computer-latency-at-a-human-scale/)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Avoid globals (once more)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "myglobal = 4" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function whohas(s::String)\n", " @everywhere begin\n", " var = Symbol($s)\n", " if isdefined(Main, var)\n", " println(\"$var exists.\")\n", " else\n", " println(\"Doesn't exist.\")\n", " end\n", " end\n", " nothing\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "whohas(\"myglobal\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@fetchfrom 6 myglobal+2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "whohas(\"myglobal\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Globals get copied to workers and continue to exist as globals even after the call.\n", "\n", "This could lead to memory accumulation if many globals are used (just as it would in a single Julia session).\n", "\n", "It's better to avoid them." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Explicit data movement: `Channel` and `RemoteChannel`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Channels in Julia are constructs to explicitly exchange data between workers.\n", "\n", "They implement `put!`, `take!`, `fetch`, `isready` and `wait` methods." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# ?Channel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ch = Channel{Int}(5) # a channel that can hold up to 5 integers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "isready(ch) # something in the channel?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "put!(ch, 3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "isready(ch)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "take!(ch)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "isready(ch)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "put!(ch, 4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fetch(ch) # basically take without a bang" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "take!(ch)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Be careful, `take!` and `put!` are blocking if the channel is empty or full!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "isready(ch)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# take!(ch) if we execute this, while isready(ch) == false, the current Julia session will hang." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Channels for inter-process data movement: `RemoteChannel`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* A `Channel` is local to a process. Worker 2 cannot directly refer to a `Channel` on worker 3 and vice-versa.\n", "\n", "\n", "* A `RemoteChannel`, however, can put and take values across workers. A `RemoteChannel` can be thought of as a handle to a `Channel`.\n", "\n", "\n", "* Any process with a reference to a `RemoteChannel` can put and take items from the channel. Data is automatically sent to (or retrieved from) the process a `RemoteChannel` is associated with.\n", "\n", "\n", "* The process id, pid, associated with a `RemoteChannel` identifies the process where the backing store, i.e., the backing Channel exists." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nworkers()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs(4)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "?RemoteChannel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# creates a channel on the second worker process\n", "# create a RemoteChannel handle to this channel on the master process\n", "const mychannel = RemoteChannel(()->Channel{Int}(10), workers()[2])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "whohas(\"mychannel\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# One could create a global constant mychannel everywhere\n", "@everywhere const mychannel = $mychannel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "whohas(\"mychannel\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, as we said many times before, one should generally try to avoid globals. The following is preferable." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function do_something()\n", " rc = RemoteChannel(()->Channel{Int}(10)) # lives on the master\n", " @sync for p in workers()\n", " @spawnat p put!(rc, myid())\n", " end\n", " rc\n", "end\n", "\n", "r = do_something()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "isready(r)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while isready(r)\n", " @show take!(r)\n", "end" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The ecosystem also contains a couple of tools, that make data transfer even simpler. See for example [ParallelDataTransfer.jl](https://github.com/ChrisRackauckas/ParallelDataTransfer.jl/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallelizing the easy way - `@distributed` and `pmap`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far we have seen the build block of commands for distributed computing in Julia. Having scientific computing in mind, one might not always want to think about how to distribute the work and explicitly spawn tasks.\n", "\n", "Also, fortunately, many useful parallel computations do not require (much) data movement. A common example is a direct Monte Carlo simulation, where multiple processes can handle independent simulation trials simultaneously. (We'll get to that later!)\n", "\n", "Julia provides convenience macros to\n", " * Parallelize loops (`@distributed`)\n", " * Apply a function to all elements in some collection (`pmap`)\n", " \n", "Let's explore these!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed loops (`@distributed`)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(4); nworkers()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# serial version - count heads in a series of coin tosses\n", "function add_serial(n)\n", " c = 0\n", " for i = 1:n\n", " c += rand(Bool)\n", " end\n", " c\n", "end\n", "\n", "@btime add_serial(200_000_000);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is trivially parallelizable since the loop iterations are independent of each other. We can distribute coin tosses over a couple of workers.\n", "\n", "Afterwards we combine the results, that is we sum them up. The combination process is generally called a *reduction*, and in this case `sum` is the *reducer function*.\n", "\n", "To distribute the for loop over worker processes Julia provides the `@distributed` macro:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "?@distributed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# distributed version\n", "function add_distributed(n)\n", " c = @distributed (+) for i in 1:n\n", " Int(rand(Bool))\n", " end\n", " c\n", "end\n", "\n", "@btime add_distributed(200_000_000);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The distributed version is about **4x faster**, which is all we could hope for." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see who is doing the work" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# verbose distributed version\n", "function add_distributed(n)\n", " c = @distributed (+) for i in 1:n\n", " x = Int(rand(Bool))\n", " println(x);\n", " x\n", " end\n", " c\n", "end\n", "\n", "add_distributed(8);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Apparently, the work is evenly distributed between the workers. By using `@distributed` we let Julia decide how to split up the work and can't control it ourselves." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A common mistake when using `@distributed` is the following:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function f(n)\n", " a = 0\n", " @distributed (+) for i in 1:n\n", " a += 1\n", " end\n", " a\n", "end\n", "\n", "a = f(10);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What do you expect the value of `a` to be?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can (sort of) see what's happening by making everything global" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a = 0\n", "@distributed (+) for i in 1:10\n", " println(\"1\")\n", " global a += 1\n", "end;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere @show a" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The variable `a` gets copied to the worker processes as it is referenced in the distributed loop. \n", "\n", "Every worker will then increment its copy of `a`.\n", "\n", "However, we do not save the result of the reduction (sum) but instead return `a` from the master process, which hasn't been altered at all.\n", "\n", "Corrected version:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function f2(n)\n", " a = @distributed (+) for i in 1:n\n", " 1\n", " end\n", " a\n", "end\n", "\n", "a = f2(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### What if I don't want to reduce?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similar to the mistake above, the following example might not have the effect one expects. **Why?**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "a = zeros(10)\n", "@distributed for i = 1:10\n", " a[i] = i\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere @show a" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that `@distributed` without a reduction function returns a `Task`. It is basically a distributed version of `@spawn` for all the iterations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## `SharedArray`s" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To actually make all processes operate on the same array, one can use a `SharedArray`.\n", "\n", "Note that a `SharedArray` only works if the **processes live on the same host**.\n", "\n", "The constructor of a SharedArray is\n", "\n", "```julia\n", "SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])\n", "```\n", "\n", "which creates an `N`-dimensional shared array of a (bits) type `T` and size `dims` across the processes specified by `pids`.\n", "\n", "(If an `init` function, of signature `initfn(S::SharedArray)`, is specified, it is called on all the participating workers. You can specify that each worker runs the init function on a distinct portion of the array, thereby parallelizing initialization.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere using SharedArrays # must be loaded everywhere" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "A = rand(2,3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "S = SharedArray(A)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok, now that we know how to create and fill our `SharedArray` we can create a parallel fill function:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function fill_shared_problematic(N)\n", " S = SharedMatrix{Float64}(N,N)\n", " @distributed for i in 1:length(S)\n", " S[i] = i\n", " end\n", " S\n", "end\n", "\n", "fill_shared_problematic(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Why is the method in its current form problematic? Try to find out yourself by going to larger `N` and, for example, inspecting the minimum of the returned `SharedArray`!*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Going to larger matrix sizes...." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function fill_shared_problematic(N)\n", " S = SharedMatrix{Int64}(N,N)\n", " @distributed for i in 1:length(S)\n", " S[i] = i\n", " end\n", " S\n", "end\n", "\n", "S = fill_shared_problematic(100)\n", "minimum(S)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note how sometimes the array isn't completely filled but still contains zeros. This is because it isn't filled **yet**!\n", "\n", "Check again!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "minimum(S)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can use `@sync` to synchronize our distributed for loop." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function fill_shared_problematic(N)\n", " S = SharedMatrix{Int64}(N,N)\n", " @sync @distributed for i in 1:length(S) # added @sync here\n", " S[i] = i\n", " end\n", " S\n", "end\n", "\n", "S = fill_shared_problematic(100)\n", "minimum(S)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok, let's **benchmark** this for a larger matrix size" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# regular array\n", "function fill_regular(N)\n", " A = Matrix{Int64}(undef,N,N)\n", " for i in 1:length(A)\n", " A[i] = i\n", " end\n", " A\n", "end\n", "\n", "@time fill_regular(10000);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# shared array\n", "function fill_shared(N)\n", " S = SharedMatrix{Int64}(N,N)\n", " @sync @distributed for i in 1:length(S)\n", " S[i] = i\n", " end\n", " S\n", "end\n", "\n", "@time fill_shared(10000);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is of course just filling an array.\n", "\n", "If there were actual calculations it might actually be beneficial to distribute the work across workers." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallel map: `pmap`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sometimes we merely wish to apply a function to all all elements in a collection.\n", "\n", "For those cases, Julia provides the `pmap` (parallel map) function." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Say, we want to compute the singular values of a bunch of larger matrices in parallel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "using Distributed, BenchmarkTools; rmprocs(workers()); addprocs(Hwloc.num_physical_cores()); nworkers()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere using LinearAlgebra\n", "\n", "M = Matrix{Float64}[rand(200,200) for i = 1:10];\n", "\n", "pmap(svdvals, M)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Check that really all of the workers participated\n", "pmap(m->begin println(myid()); svdvals(m) end, M);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "function svds_loop(M)\n", " svds = Vector{Vector{Float64}}(undef, 10)\n", " for (i, m) in enumerate(M)\n", " svds[i] = svdvals(m)\n", " end\n", " svds\n", "end\n", "\n", "@time svds_loop(M);\n", "@time svdvals.(M);\n", "@time pmap(svdvals, M);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### When to choose which?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Julia's pmap is designed for the case where\n", "* each function call does a **large amount of work** and/or\n", "* the **workload is non-uniform**.\n", "\n", "In contrast, `@distributed` can handle situations where\n", "* **each iteration is tiny**, i.e. perhaps only summing two numbers and/or\n", "* each iteration **takes about the same time**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Scaling things up: distributed computing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far we have worked on multiple cores on a single machine, your laptop for example.\n", "\n", "Processes can live on other machines as well! This allows us to distribute our computation across computer clusters.\n", "\n", "In principle, the plan of action is the same as in the multi-core case. However, we have to take into account the different memory situation. In particular, **data movement is expensive** and we won't be able to use `SharedArray`s." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rmprocs(workers()) # fresh start" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating workers on the cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Adding processes on different machines is not much harder than adding them on your local machine. In the following we will take the last example, calculating singular values of a bunch of matrices, and distribute it over multiple computers in our thp network.\n", "\n", "In Julia, starting worker processes is handled by [ClusterManagers](https://docs.julialang.org/en/stable/manual/parallel-computing/#ClusterManagers-1).\n", "\n", "* The default one is `LocalManager`. It is automatically used when running `addprocs(i::Integer)` and we have implicitly used it already!\n", "* The one we are going to use for the THP cluster is `SSHManager`. It is automatically used when running `addprocs(hostnames::Array)`.\n", "\n", "Other cluster managers for SLURM, PBS, and others are provided in [ClusterManagers.jl](https://github.com/JuliaParallel/ClusterManagers.jl)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In principle, starting processes on other computers can be done by `addprocs([\"l93\", \"l94\"])`, where `\"l93\"` and `\"l94\"` are hostnames. The only requirement is a **passwordless ssh access** to all specified hosts." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Demonstrate in terminal from thp node*\n", "\n", "```julia\n", "using Distributed\n", "\n", "addprocs([\"l93\", \"l94\"])\n", "\n", "@everywhere println(gethostname())\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "One can also start multiple processes on different machines:\n", "```julia\n", "addprocs([(\"l93\", 2), (\"l94\", 3)]) # starts 2 workers on l92 and 3 workers on l93\n", "\n", "# Use :auto to start as many processes as CPUs are available\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By default, `addprocs` expects the julia executable in the same folder as on the master computer (remember: workers are independent Julia processes). It will also try to `cd` to the same folder.\n", "\n", "In my case this would be" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@show pwd();\n", "@show Sys.BINDIR;" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Both folders don't exist in my thp account (those are linux machines!), so I'll have to tell Julia to use different paths.\n", "\n", "Also, as per thp cluster guidelines one **(!) must (!) run computations on other thp computer with `nice -19` priority setting**!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating `nice -19` workers and specifying directories " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see from `?addprocs`, `addprocs` takes a bunch of keyword arguments, two of which are of particular importance.\n", "\n", "* `dir`: working directory of the worker process\n", "* `exename`: path to julia executable (potentially augmented with pre-commands)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "params = (exename=`nice -19 /home/bauer/bin/julia-1.5.3/bin/julia --project=/home/bauer/JuliaNRW21`, dir=\"/home/bauer\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs([(\"l93\", :auto)]; params...)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere println(gethostname())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rmprocs(workers())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ok, let's get some resources :)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "machines = [\"l93\", \"l94\", \"l96\"];\n", "\n", "procs_per_machine = :auto; # :auto for n = # cpus\n", "\n", "jobs = [(m,procs_per_machine) for m in machines]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "addprocs(jobs; params...)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere println(gethostname())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere using LinearAlgebra\n", "\n", "@time x = pmap(svdvals, M);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed arrays (`DArray`)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Github: https://github.com/JuliaParallel/DistributedArrays.jl\n", "\n", "In a `DArray`, each process has local access to just a chunk of the data, and no two processes share the same chunk. Processes can be on different hosts.\n", "\n", "Distributed arrays are for example useful if\n", "\n", "* Expensive calculations should be performed in parallel on parts of the array on different hosts.\n", "* The data doesn't fit into the local machines memory (Loading big files in parallel)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@everywhere using DistributedArrays, LinearAlgebra" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "M = Matrix{Float64}[rand(200,200) for i = 1:10];" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "D = distribute(M)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Which workers hold parts of D?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "procs(D)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Which parts do they hold?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "localpart(D) # the master doesn't hold anything" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Which parts do they hold?\n", "for p in workers()\n", " display(@fetchfrom p localpart(D))\n", " display(@fetchfrom p DistributedArrays.localindices(D)) # DistributedArrays. necessary because of SharedArrays above\n", "end" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time Msquared = map(svdvals, M);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time Dsquared = map(svdvals, D);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time Psquared = pmap(svdvals, M);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Msquared ≈ Dsquared" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Dsquared ≈ Psquared" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But remember, for small operations the data movement can (and will) exceed the benefit of parallelizing the computation!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@time map(sum, M);\n", "@time map(sum, D);" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Stop worker processes!\n", "rmprocs(workers())" ] } ], "metadata": { "@webio": { "lastCommId": null, "lastKernelId": null }, "kernelspec": { "display_name": "Julia 1.5.3", "language": "julia", "name": "julia-1.5" }, "language_info": { "file_extension": ".jl", "mimetype": "application/julia", "name": "julia", "version": "1.5.3" } }, "nbformat": 4, "nbformat_minor": 4 }