{ "metadata": { "name": "" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Introduction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* This talk is available on github at [http://github.com/jseabold/zorro](http://github.com/jseabold/zorro)\n", "* Clone using git\n", "\n", "```\n", " git clone git://github.com/jseabold/zorro.git zorro-talk\n", "```\n", "\n", "* Checkout using subversion\n", "\n", "```\n", " svn checkout https://github.com/jseabold/zorro zorro-talk\n", "```\n", "\n", "* Start the notebook\n", "\n", "```bash\n", " cd zorro-talk\n", " ipython notebook --notebook-dir=.\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Most of this presentation is taken from the [IPython parallel computing documentation](http://ipython.org/ipython-doc/dev/parallel/)\n", "* And from talks given over the years by the core development team of [@minrk](https://twitter.com/minrk), [@ellisonbg](https://twitter.com/ellisonbg), and [@fperez_org](https://twitter.com/fperez), among many others\n", "* IPython is well [documented](http://ipython.org/), including [video tutorials](http://ipython.org/videos.html)\n", "* There is a great support network for IPython on [stackoverflow](http://stackoverflow.com/questions/tagged/ipython) and on their [mailing list](http://mail.scipy.org/mailman/listinfo/ipython-user)\n", "* This talk is created using the [IPython Notebook](http://ipython.org/notebook.html), which also support parallelism" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Talk Dependencies" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* IPython\n", " - Notebooks dependencies\n", " - Parallel dependencies\n", "* NumPy\n", "* SciPy\n", "* sympy (optional)" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import numpy as np\n", "import matplotlib.pyplot as plt" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Aims" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Assume familiarity with basic Python\n", "* Cover some basic concepts for parallel (scientific) computing \n", "* Provide context for IPython.parallel\n", "* Fill-in documentation gaps for the getting started hurdle" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Some (Biased) Options for (Scientific Computing) Parallelism in Python" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Symmetric Multiprocessing (SMP) (Shared Main-Memory)\n", "\n", "* [multiprocessing](http://docs.python.org/2/library/multiprocessing.html)\n", " * appropriate for CPU bound problems\n", " * takes advantage of multiple CPUs/cores\n", " * runs in separate memory space\n", " * OS handles process scheduling\n", " * Would take extra handling to use on a compute cluster\n", "* [threading](http://docs.python.org/2/library/threading.html)\n", " * appropriate for I/O bound problems\n", " * runs in the same memory space\n", " * threading library handles scheduling\n", " * GIL can get in the way here on CPython - keeps you from writing to the same memory address at the same time from two different threads\n", " * optimized C extension libraries will release the GIL\n", "* [joblib](http://pythonhosted.org/joblib/index.html)\n", " * memoization\n", " * embarrassingly parallel for loops\n", " * trivial to use\n", " * SMP-only (at the moment)\n", " \n", "#### Cluster Computing (Memory not available to all processors)\n", " \n", "* [IPython.parallel](http://ipython.org/ipython-doc/dev/parallel/)\n", " * This is the focus of this notebook\n", " * Why IPython?\n", " * Designed by computational scientists for computational scientists\n", " * Design goal to make things easy and get out of the way\n", " * Interactive usage for debugging/developing\n", "* Other options\n", " * Pyro\n", " * pp (Parallel Python)\n", " * Disco (Map-Reduce)\n", " * [...](https://wiki.python.org/moin/ParallelProcessing)" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "Architecture" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython.display import Image\n", "Image(filename=\"./parallel_architecture400.png\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Three Core Parts\n", "\n", "1. The **Client**\n", " - This is what you use to run your parallel computations\n", " - You will interact with a **View** on the client \n", " - The type of View depends on the execution model you are using\n", "2. The **Engine(s)**\n", " - An IPython \"kernel\" where the code is executed\n", " - Listens for instructions over a network connection\n", "3. The **Controller**\n", " - The IPython controller consists of 1) the **Hub** and 2) the **schedulers**\n", " - The **Hub** is the central process that monitors everything\n", " - The **schedulers** take care of getting of getting your code where it should go\n", " - The controller is the go between from the Client to the Engines" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "IPython client and views" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* The **`Client`** object connects to the cluster\n", "* For each execution model there is a corresponding **`View`**\n", "* For example, there are two basic views:\n", " - The `DirectView` class for explicitly running code on a particular engine(s)\n", " - The `LoadBalancedView` class for running your code on the 'best' engine(s)\n", "* You can use as many views as you like, many at the same time\n", "* You can read much more about the details of IPython parallel and views in the [documentation](http://ipython.org/ipython-doc/dev/parallel/parallel_details.html)" ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Getting Started" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Take advantage of multiple the processors on your local machine\n", "* Say you have 4 processors\n", "* How many processors do I have available?" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from multiprocessing import cpu_count\n", "print cpu_count()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Start a controller and 4 engines with the **ipcluster** program\n", "* At the command line type\n", "\n", " `ipcluster start -n 4`\n", "

\n", "* Or, in the notebook at the dashboard" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Did it work?" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython import parallel\n", "\n", "rc = parallel.Client(profile='hpc')\n", "rc.block = True" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "rc.ids" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def power(a, b):\n", " return a**b" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Create a `direct view` of kernel 0\n", "* The Client support slicing" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dv = rc[0]\n", "dv" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dv.apply(power, 2, 10)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Recall that slice notation allows you leave out start and stop steps" ] }, { "cell_type": "code", "collapsed": false, "input": [ "X = [1, 2, 3, 4]\n", "X" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "X[:]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use this to send code to all the engines" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rc[:].apply_sync(power, 2, 10)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Python's built-in map function allows you to call a sequence a function over a sequences of arguments" ] }, { "cell_type": "code", "collapsed": false, "input": [ "map(power, [2]*10, range(10))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In parallel, you use view.map" ] }, { "cell_type": "code", "collapsed": false, "input": [ "view = rc.load_balanced_view()\n", "view.map(power, [2]*10, range(10))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "When to Parallelize?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " \"Premature optimization is the root of all evil\".\n", " -Donald Knuth\n", " \n", "* Write code how you want it to look then optimize\n", "* *But* be smart" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Profile Your Code" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* This is important\n", "* Know where your bottlenecks are\n", "* Python has a great built-in profiler [cProfile](http://docs.python.org/2/library/profile.html)\n", " * Visualize with [RunSnakeRun](http://www.vrplumber.com/programming/runsnakerun/)\n", "* There is also [line_profiler](https://pypi.python.org/pypi/line_profiler) by Robert Kern\n", "* [kcachegrind](http://kcachegrind.sourceforge.net/html/Home.html) is another option that includes visualization\n", "* Use these!" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Write Smart Code" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Get the speed-ups you can first\n", "* If you're working with a dynamic language **don't write loops**\n", " * Take advantage of libraries like NumPy\n", "* If you can, compile it\n", " * Write bottlenecks in a compiled language like [C/C++](http://docs.python.org/2/extending/extending.html) or [Fortran](http://www.scipy.org/F2py)\n", " * Really. Look at this [Python-Fortran 90 side-by-side comparison](http://fortran90.org/src/rosetta.html)\n", " * Use [Cython](http://cython.org)" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "What kind of speed-up can I expect?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Speed-ups follow Ahmdal's law\n", "\n", "$$\\frac{1}{(1-P)+\\frac{P}{S}}$$\n", "\n", "* P is the proportion of your code that can be parallelized\n", "* S is the speed-up you can achieve\n", " \n", "$$S=\\frac{T_1}{T_p}$$\n", "\n", "where $T_1$ is the time it takes to run the serialized code and $T_p$ is the speed-up for using $p$ processors\n", "\n", "* Ideal speed-up is linear scaling $S=p$\n", "* For example, say only 40% of your code can be parallelized\n", "* The parallel parts get a speed up of $3=\\frac{9min}{3min}$ using 4 processors\n", "* Amdahl's law says you can get a speed-up of \"only\" $1.36\\times$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Otherwise stated\n", "\n", "$$\\frac{1}{(1-P)+\\frac{P}{N}}$$\n", "\n", "where $N$ is the number of processors\n", "\n", "* In the limit, the maximum speed-up is $\\frac{1}{1-P}$\n", "* If P = 90%, the most you can get is a maximum speed-up of 10\n", "* There are strongly decreasing returns to $N$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Take-aways\n", "\n", "1. You're fine using a small number of processors\n", "2. Or hope that $P$ is very high -- so called embarrassingly parallel problems\n", " * This tutorial focuses on these cases" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Things to be aware of" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Overhead of parallel framework vs. function execution time\n", "* Pushing and pulling results over the network from the engines takes time\n", "* Functions faster than ~100ms need not apply" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "The Direct Interface" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* The direct interface lets the user interact explicitly with each engine\n", "* First, create a direct view" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython import parallel\n", "\n", "rc = parallel.Client(profile='hpc')" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Above we saw the use of `map` and `apply` in parallel\n", "* You may have also noticed this bit of code" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rc.block = True" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* In blocking mode, whenever you execute some code on the engines the controller waits until this code is done executing\n", "* Non-blocking mode is the default" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Get access to all the engines" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dview = rc.direct_view()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* You can block on a call-by-call basis as well, by using apply_sync for synchronous execution" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.block = False\n", "\n", "dview[\"a\"] = 5 # shorthand for push\n", "dview[\"b\"] = 7\n", "\n", "dview.apply_sync(lambda x: a + b + x, 27)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* There is also `apply_async`\n", "* Above you'll notice that the assignments defined these variables on the engines in a dictionary-like manner" ] }, { "cell_type": "code", "collapsed": false, "input": [ "d = {}\n", "d[\"a\"] = 5" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "d" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* This is shorthand for `push`ing python objects to the engines\n", "* DirectViews provide dictionary-like access by `key` or by using `get` and `update` like built-in dicts\n", "* This can also be done explicitly with `push`\n", "* `push` takes a dictionary" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.push(dict(msg=\"Hi, there\"), block=True)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.block = True" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Python commands can be executed as strings on specific engines" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.execute(\"x = msg\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview[\"x\"] # shorthand for pull" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* You can also use pull explicitly to get back from the engine" ] }, { "cell_type": "code", "collapsed": false, "input": [ "#rc[::2].execute(\"c = a + b\")\n", "# or\n", "dview.execute(\"c = a + b\", targets=[0,2])\n", "#rc[1::2].execute(\"c = a - b\")\n", "# or\n", "dview.execute(\"c = a - b\", targets=[1,3])" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.pull(\"c\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* If we were working in non-blocking mode, we would get an [`AsyncResult` object](http://ipython.org/ipython-doc/dev/parallel/asyncresult.html) back immediately" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def wait(t):\n", " import time\n", " tic = time.time()\n", " time.sleep(t)\n", " return time.time() - tic" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "ar = dview.apply_async(wait, 2)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "type(ar)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* We use its get method to get the result\n", "* Calling get blocks" ] }, { "cell_type": "code", "collapsed": false, "input": [ "ar.get()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* If we weren't quite so patient, we could ask if our tasks are done by using the `ready` method" ] }, { "cell_type": "code", "collapsed": false, "input": [ "ar = dview.apply_async(wait, 15)\n", "\n", "print ar.ready()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Or we can ask for the result, waiting a maximum of, say, 5 seconds" ] }, { "cell_type": "code", "collapsed": false, "input": [ "ar.get(5)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Often, we can't go on until some results are done\n", "* For this, we can use the `wait` method\n", "* `wait` can take an iterable of `AsyncResults`" ] }, { "cell_type": "code", "collapsed": false, "input": [ "result_list = [dview.apply_async(wait, 3) for i in range(5)]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result_list" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.wait(result_list)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "result_list[4].get()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Scatter and Gather" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* You can use `scatter` to partition an iterable across engines\n", "* `gather` pulls the results back\n", "* You can use this to do parallel list comprehensions as below\n", "* Sometimes this is more convenient than map" ] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.scatter('x', range(64))\n", "\n", "%px y = [i**10 for i in x]\n", "\n", "y = dview.gather('y')\n", "\n", "print y[:10]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* The `%` indicates that we are using an [IPython 'magic' function](http://ipython.org/ipython-doc/dev/interactive/tutorial.html#magic-functions)\n", "* The available parallel magics are listed in the [documentation](http://ipython.org/ipython-doc/dev/parallel/magics.html)" ] }, { "cell_type": "heading", "level": 3, "metadata": {}, "source": [ "The Task Interface" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [The Task interface](http://ipython.org/ipython-doc/dev/parallel/parallel_task.html) allows you to use your the engines as a system of workers\n", "* You no longer have direct access to the individual engines\n", "* If your tasks are easily segmented into pieces that do not depend on each other, the Task Interface may be ideal\n", "* *However*, you can specify complex dependencies to describe task execution order\n", "* You can use many standard scheduling paradigms for how tasks should be run or define your own\n", "* I am not going to discuss the task interface in detail" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rc = parallel.Client(profile='hpc')\n", "\n", "lview = rc.load_balanced_view()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.block = True\n", "\n", "parallel_result = lview.map(lambda x:x**10, range(32))\n", "\n", "print parallel_result[:10]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Examples" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Poor Man's Global Optimization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* In reality, you'll rarely ever want to proceed this way for difficult optimization problems\n", "* This shows how you could farm out optimization tasks after some sort of scatter-search" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Function with multiple minima\n", "* From Judge, *et. al.* The Theory and Practice of Econometrics\n", "* Nonlinear least squares 2 local minima and 20 generated observations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "$$y_t = \\theta_1 + \\theta_2x_{t2} + \\theta_2^2x_{t3} + \\epsilon_t$$" ] }, { "cell_type": "code", "collapsed": false, "input": [ "y = np.array([4.284, 4.149, 3.877, .533, 2.211, 2.389, \n", " 2.145, 3.231, 1.998, 1.379, 2.106, 1.428, \n", " 1.011, 2.179, 2.858, 1.388, 1.651, 1.593,\n", " 1.046, 2.152])\n", "\n", "x = np.array([.286, .645, .973, .585, .384, .310, \n", " .276, .058, .973, .455, .543, .779, \n", " .957, .259, .948, .202, .543, .028, \n", " .797, .099, .936, .142, .889, .296, \n", " .006, .175, .828, .180, .399, .842, \n", " .617, .039, .939, .103, .784, .620, \n", " .072, .158, .889, .704]).reshape(20,2)\n", "\n", "x = np.column_stack((np.ones(len(x)), x))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print y" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print x" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def func(params, y, x):\n", " import numpy as np\n", " theta = np.r_[params[0], params[1], params[1]**2]\n", " return y - np.dot(x,theta)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "theta1, theta2 = np.mgrid[-3:3:100j,-3:3:100j]\n", "Z = [np.sum(func([i,j], y, x)**2) for i,j in \n", " zip(theta1.flatten(), theta2.flatten())]\n", "Z = np.asarray(Z).reshape(100,100)\n", "\n", "fig, ax = plt.subplots(figsize=(6, 6))\n", "\n", "V = [16.1, 18, 20, 20.5, 21, 22, 24, \n", " 25, 30, 40, 50, 100, 200, 300, \n", " 400, 500, 600, 700]\n", "\n", "c = ax.contour(theta1, theta2, Z, V)\n", "im = ax.imshow(Z, interpolation='bilinear', origin='lower',\n", " cmap=plt.cm.BrBG, extent=(-3,3,-3,3))\n", "cb = plt.colorbar(c)\n", "ax.set_xlabel(r'$\\theta_1$')\n", "ax.set_ylabel(r'$\\theta_2$')\n", "#ax.scatter([.864737, 2.35447, 2.49860664], [1.235748, -.319186, -0.98261242],\n", "ax.scatter([.864737, 2.49860664], [1.235748, -0.98261242],\n", " marker=\"x\", s=30, color='black', lw=2)\n", "ax.set_title('Loci of objective function')\n", "ax.set_xlim([-3,3])\n", "ax.set_ylim([-3,3])\n", "ax.grid(False)\n", "plt.show()" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "x1 = [0,0] # good\n", "x2 = [2.354471, -.319186] # bad\n", "x3 = [1, 1] # good\n", "x4 = [-3.17604581, -0.680944] # bad\n", "# assume we got these in some sane way\n", "xs = np.random.normal(0, 4, size=(20, 2))\n", "\n", "starts = np.row_stack((x1, x2, x3, x4, xs))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "def optimize_func(start_params):\n", " return leastsq(func, start_params, args=(y, x))[0]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview = rc[:]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "with dview.sync_imports():\n", " from scipy.optimize import leastsq\n", " import numpy as np" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "dview.push(dict(func=func, y=y, x=x));" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "results = dview.map_sync(optimize_func, starts)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "opt_func = lambda params : np.sum(func(params, y, x)**2)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "i_best = np.argmin(map(opt_func, np.array([result for result in results])))" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print results[i_best]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Moving to a Cluster" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Tools like IPython and StarCluster takes care of things for you\n", "* Starts the engines on the compute nodes for you\n", " - Most likely 1 per core" ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "StarCluster Configuration to use Amazon's EC2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Read more about this [here](http://ipython.org/ipython-doc/dev/parallel/parallel_process.html#ipython-on-ec2-with-starcluster)\n", "* install StarCluster\n", "\n", " $ pip install starcluster --user\n", "\n", "* setup your base config file\n", "\n", " $ starcluster help\n", " \n", " Select option 2.\n", " \n", "* [Setup your config file](http://star.mit.edu/cluster/docs/latest/quickstart.html) with your SSH keys, etc. \n", "* Add your AWS credentials\n", "* Add the few [IPython-specific lines to your config file](http://star.mit.edu/cluster/docs/latest/plugins/ipython.html#ipython-cluster-plugin)\n", "* Run\n", "\n", " $ starcluster start mycluster\n", " \n", "* You can login to your master node via ssh by running\n", "\n", " $ starcluster sshmaster mycluster -u myuser\n", " \n", " Replacing with your information as needed.\n", " \n", "* Or better yet, follow the instructions above to create a local IPython interpreter or notebook connected to your remote EC2 instance\n", " * You will need to be running the same IPython version as the one on EC2.\n", " * For the default starcluster images, this is still IPython 0.13.1\n", "* Run parallel scripts or create views and use IPython just as if you would locally." ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Configuration for AU's Zorro HPC" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Log in to Zorro\n", "\n", "Create profile from the terminal

\n", "\n", "`ipython profile create --parallel --profile=your-profile-name` \n", "\n", "You can have as many as you want. For example, you may have a different profile depending on the queue you want to use or one with different default imports on the engines. I named mine `hpc`.

\n", "\n", "Go to

\n", "\n", "`$HOME/.ipython/profile_your-profile-name`\n", "\n", "You can make sure of your configuration directory by running\n", "\n", "`ipython locate`\n", "\n", "In this directory, edit the following lines in `ipcluster_config.py` to read\n", "\n", " c.IPClusterStart.controller_launcher_class = 'LSF'\n", " c.IPClusterEngines.engine_launcher_class = 'LSF'\n", "\n", "**Set up the controller**\n", " \n", "Edit the following lines in `ipcontroller_config.py` to read\n", "\n", " c.HubFactory.ip = '*'\n", " \n", "This is so that the controller listens on all interfaces for the engines.\n", " \n", "**Set up the engines**\n", "\n", "Edit the following lines in `ipengine_config.py`\n", "\n", " c.IPEngineApp.work_dir = u'$HOME/scratch/'\n", " c.EngineFactory.timeout=10\n", "\n", "The last step is to edit your `~/.bashrc` and add the following line\n", "\n", " export PATH=$PATH:/app/epd/bin\n", " \n", "Then type\n", "\n", " source ~/.bashrc\n", "\n", "This is so that you can run the ipcluster or ipcontroller scripts on the head node. *Alternatively*, you could create symlink in your $HOME/bin folder.\n", "\n", " ln -s /app/epd/bin/ipcluster ~/bin/\n", " ln -s /app/epd/bin/ipcontroller ~/bin/\n", "\n", "\n", "**Create your batch scripts**\n", "\n", "Make two files in your working directory that will be your batch files for the engines and the controller. I named mine `lsf.engine.template` and `lsf.controller.template`. After the initial set-up these (or similar) files will control our job submission.\n", "\n", "Tell the `ipcluster_config.py` file about your batch scripts by adding the following lines.\n", "\n", " c.LSFEngineSetLauncher.batch_template_file = \"lsf.engine.template\"\n", " c.LSFControllerLauncher.batch_template_file = \"lsf.controller.template\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**lsf.engine.template**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " #!/bin/bash\n", " \n", " #BSUB-L /bin/bash\n", " #BSUB-J ipython\n", " #BSUB-q interactive\n", " #BSUB-n {n}\n", " #BSUB-u your-email@american.edu\n", " #BSUB-N\n", " #BSUB-c 5\n", " \n", " # the ipython code\n", " \n", " # enter your working directory\n", " cd $HOME/scratch\n", " \n", " export PATH=$HOME/bin:/app/epd/bin/\n", " export PYTHONPATH=/app/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages\n", " ipengine --profile=hpc" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**lsf.controller.template**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " #!/bin/bash\n", " \n", " #BSUB-L /bin/bash\n", " #BSUB-J ipython\n", " #BSUB-q interactive\n", " #BSUB-n 1\n", " #BSUB-u your-email@american.edu\n", " #BSUB-N\n", " #BSUB-c 5 # timeout in minutes\n", " \n", " cd $HOME/scratch\n", " \n", " export PATH=$HOME/bin:/app/epd/bin/\n", " export PYTHONPATH=/app/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages\n", " ipcontroller --profile=hpc\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can run the cluster with\n", "\n", " ipcluster start --profile=hpc --n=2\n", " \n", "I run it with\n", "\n", " ipcluster start --profile=hpc --n=2 &\n", " \n", "The `&` puts the job in the background." ] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Stopping Jobs" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* When your job is done you can run \n", "\n", " ipcluster stop --profile=hpc\n", " \n", "* You can also stop your engines (and the hub) from within your Python scripts by using\n", "\n", " rc.shutdown(hub=True)\n", " \n", "* There are a few other ways to do this. Consult the IPython documentation and examples/" ] }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "More Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Logging into Zorro from the Notebook\n", "* !\n", "* This is not a typical workflow for long-running jobs\n", "* I am only doing this because **a)** I can and **b)** as a demonstration\n", "\n", "
    \n", "
  1. start a cluster on Zorro
  2. \n", "
  3. (Secure) Copy the \"ipcontroller-client.json\" to your local machine. On your local machine type
  4. \n", "\n", "
    \n",
          "    scp js2796a@zorro.american.edu:/home/js2796a/.ipython/profile_hpc/security/ipcontroller-client.json  ~/school/talks/zorro\n",
          "
    \n", "\n", "
  5. Run the following code, it will prompt for your password
  6. \n", "
  7. If you aren't doing a public demo, you can just provide the password by argument
  8. \n", "
  9. You might still have to type this password at the terminal as well
  10. \n", "
\n", "\n", "**NOTE:** You will need to be running the same version of IPython locally as you are running on the server. This is currently 1.1 on the AU HPC and 0.13.1 on the StarCluster images." ] }, { "cell_type": "code", "collapsed": false, "input": [ "from IPython import parallel" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "rc = parallel.Client(\"./ipcontroller-client.json\", sshserver=\"your-login@hpcserver\", timeout=60)\n", "```" ] }, { "cell_type": "code", "collapsed": false, "input": [ "rc = parallel.Client(\"./ipcontroller-client.json\", sshserver=\"js2796a@zorro.american.edu\", timeout=60)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Alternatively, you can connect to a StarCluster just as easily.\n", "\n", "```\n", " $ starcluster start mycluster\n", " \n", " $ starcluster sshmaster mycluster -u myuser\n", " \n", " skipper@master:~$ ipython\n", " In [1]: from IPython.parallel import Client\n", " \n", " In [2]: rc = Client()\n", " \n", " In [3]: view = rc[:]\n", " \n", " In [4]: view.block = True\n", " \n", " In [5]: rc.ids\n", " Out[5]: [0, 1]\n", " \n", " In [6]: view.execute(\"import socket; x = socket.gethostname()\")\n", " Out[6]: \n", " \n", " In [7]: view[\"x\"]\n", " Out[7]: ['master', 'node001']\n", "```" ] }, { "cell_type": "code", "collapsed": false, "input": [ "view = rc[:]\n", "\n", "view.block = True" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "rc.ids" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "view.execute(\"import socket; x = socket.gethostname()\")\n", "\n", "view[\"x\"]" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "Computing n*10 million digits of $\\pi$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Two files pidigits.py and parallelpi.py are included in this repository\n", "* They are from the IPython examples/parallel folder\n", "* There are several interesting examples here that you might want to go through\n", "* Copy them to your remote working directory\n", "\n", "\n", " scp js2796a@american.edu:/home/js2796a/scratch/ ~/school/talks/zorro/parallelpi.py\n", " scp js2796a@american.edu:/home/js2796a/scratch/ ~/school/talks/zorro/pidigits.py" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from pidigits import plot_one_digit_freqs, txt_file_to_digits, one_digit_freqs\n", "#view.execute(\"from pidigits import *\")" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* SymPy is a Python library for doing symbolic mathematics\n", "* It includes support for arbitrary precision floating point numbers" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import sympy\n", "\n", "pi = sympy.pi.evalf(40)\n", "\n", "print pi" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create 10,000 digits of $\\pi$ using SymPy" ] }, { "cell_type": "code", "collapsed": false, "input": [ "pi = sympy.pi.evalf(10000)\n", "\n", "# make a sequence of strings\n", "digits = (d for d in str(pi)[2:])\n", "\n", "freqs = one_digit_freqs(digits)\n", "\n", "ax = plot_one_digit_freqs(freqs)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* We will be using pre-computed values of $\\pi$ from Professor Yasumasa Kanada at the University of Tokyo\n", "* The digits come in a set of text files with 10 million digits each\n", "* We will have each compute node download a file\n", "* Then each node will compute the two digit count for each file\n", "* In a final step the counts from each engine be will added up\n", "* This is an example of how you implement a map-reduce-like workflow" ] }, { "cell_type": "code", "collapsed": false, "input": [ "def compute_two_digit_freqs(filename):\n", " \"\"\"\n", " Read digits of pi from a file and compute the 2 digit frequencies.\n", " \"\"\"\n", " d = txt_file_to_digits(filename)\n", " freqs = two_digit_freqs(d)\n", " return freqs\n", "\n", "def reduce_freqs(freqlist):\n", " \"\"\"\n", " Add up a list of freq counts to get the total counts.\n", " \"\"\"\n", " allfreqs = np.zeros_like(freqlist[0])\n", " for f in freqlist:\n", " allfreqs += f\n", " return allfreqs" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the number of engines available" ] }, { "cell_type": "code", "collapsed": false, "input": [ "n = len(rc)\n", "\n", "print n" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create the list of files to process." ] }, { "cell_type": "code", "collapsed": false, "input": [ "filestring = 'pi200m.ascii.%(i)02dof20'\n", "\n", "files = [filestring % {'i':i} for i in range(1,n+1)]\n", "\n", "files" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Download the data files on the engines if they don't already exist:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "view.map(fetch_pi_file, files)" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run 10 million digits on 1 engine" ] }, { "cell_type": "code", "collapsed": false, "input": [ "from timeit import default_timer as clock\n", "t1 = clock()\n", "\n", "id0 = rc.ids[0]\n", "\n", "freqs10m = rc[id0].apply_sync(compute_two_digit_freqs, files[0])\n", "t2 = clock()\n", "\n", "digits_per_second1 = 10.0e6/(t2-t1)\n", "print \"Digits per second (1 core, 10m digits): \", digits_per_second1" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now do the same on each engine in parallel" ] }, { "cell_type": "code", "collapsed": false, "input": [ "t1 = clock()\n", "# Compute the digits\n", "freqs_all = view.map(compute_two_digit_freqs, files[:n])\n", "# Add up the frequencies from each engine.\n", "freqsn10m = reduce_freqs(freqs_all)\n", "t2 = clock()\n", "digits_per_secondn = n*10.0e6/(t2-t1)\n", "print \"Digits per second (%i engines, %i0m digits): \"%(n,n), digits_per_secondn" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "print \"Speedup: \", digits_per_secondn/digits_per_second1" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "code", "collapsed": false, "input": [ "plot_two_digit_freqs(freqsn10m, figsize=(10,10))\n", "plt.title(\"2 digit sequences in %i0m digits of pi\" % n);" ], "language": "python", "metadata": {}, "outputs": [] }, { "cell_type": "heading", "level": 4, "metadata": {}, "source": [ "There's much more" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* [Parallel](http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#remote-function-decorators) [function](http://ipython.org/ipython-doc/stable/parallel/parallel_task.html#parallel-function-decorator) [decorators](http://ipython.org/ipython-doc/dev/parallel/parallel_details.html#decorators-and-remotefunctions)\n", "* [Using MPI for message passing](http://ipython.org/ipython-doc/dev/parallel/parallel_mpi.html) (or pyzmq)\n", "* [Enabling database backends for storing information on running jobs to disk](http://ipython.org/ipython-doc/dev/parallel/parallel_db.html)\n", "* [DAGs for tasks](http://ipython.org/ipython-doc/dev/parallel/dag_dependencies.html)" ] } ], "metadata": {} } ] }