{ "metadata": { "name": "P30 LoadBalancing" }, "nbformat": 3, "nbformat_minor": 0, "worksheets": [ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Load-balancing with IPython.parallel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Basic imports" ] }, { "cell_type": "code", "collapsed": false, "input": [ "import os,sys,time\n", "import numpy as np\n", "\n", "from IPython.display import display\n", "from IPython import parallel\n", "rc = parallel.Client()\n", "dview = rc[:]" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 1 }, { "cell_type": "markdown", "metadata": {}, "source": [ "This time, we create a LoadBalancedView" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview = rc.load_balanced_view()\n", "lview" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 2, "text": [ "" ] } ], "prompt_number": 2 }, { "cell_type": "markdown", "metadata": {}, "source": [ "LoadBalancedViews behave very much like a DirectView on a single engine:\n", "\n", "Each call to `apply()` results in a single remote computation,\n", "and the result (or AsyncResult) of that call is returned directly,\n", "rather than in a list, as in the multi-engine DirectView." ] }, { "cell_type": "code", "collapsed": true, "input": [ "e0 = rc[0]" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 3 }, { "cell_type": "code", "collapsed": false, "input": [ "from numpy.linalg import norm\n", "A = np.linspace(0, 2*np.pi, 1024)\n", "\n", "e0.apply_sync(norm, A, 2)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 6, "text": [ "116.1115241640244" ] } ], "prompt_number": 6 }, { "cell_type": "code", "collapsed": false, "input": [ "lview.apply_sync(norm, A, 2)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 7, "text": [ "116.1115241640244" ] } ], "prompt_number": 7 }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "e0.apply_sync(os.getpid)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 8, "text": [ "74967" ] } ], "prompt_number": 8 }, { "cell_type": "code", "collapsed": false, "input": [ "for i in range(2*len(rc.ids)):\n", " print lview.apply_sync(os.getpid)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "74968\n", "74973\n", "74967\n", "74974\n", "74968\n", "74973" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "\n", "74967\n", "74974\n" ] } ], "prompt_number": 9 }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Map\n", "\n", "The LoadBalancedView also has a load-balanced version of the builtin `map()`" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.block = True\n", "\n", "serial_result = map(lambda x:x**10, range(32))\n", "parallel_result = lview.map(lambda x:x**10, range(32))\n", "\n", "serial_result == parallel_result" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 10, "text": [ "True" ] } ], "prompt_number": 10 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`" ] }, { "cell_type": "code", "collapsed": false, "input": [ "amr = lview.map_async(lambda x:x**10, range(32))\n", "amr" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 11, "text": [ ">" ] } ], "prompt_number": 11 }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Map results are iterable!" ] }, { "cell_type": "code", "collapsed": false, "input": [ "for n in amr:\n", " print n" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "0\n", "1\n", "1024\n", "59049\n", "1048576\n", "9765625\n", "60466176\n", "282475249\n", "1073741824\n", "3486784401\n", "10000000000\n", "25937424601\n", "61917364224\n", "137858491849\n", "289254654976\n", "576650390625\n", "1099511627776\n", "2015993900449\n", "3570467226624\n", "6131066257801\n", "10240000000000\n", "16679880978201\n", "26559922791424\n", "41426511213649\n", "63403380965376\n", "95367431640625\n", "141167095653376\n", "205891132094649\n", "296196766695424\n", "420707233300201\n", "590490000000000\n", "819628286980801\n" ] } ], "prompt_number": 12 }, { "cell_type": "markdown", "metadata": {}, "source": [ "AsyncResults with multiple results are actually iterable before their\n", "results arrive.\n", "\n", "This means that you can perform map/reduce operations on elements as\n", "they come in:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "lview.block = False" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 13 }, { "cell_type": "code", "collapsed": false, "input": [ "# scatter 'id', so id=0,1,2 on engines 0,1,2\n", "dv = rc[:]\n", "dv.scatter('id', rc.ids, flatten=True)\n", "print dv['id']\n", "\n", "# create a Reference to `id`. This will be a different value on each engine\n", "ref = parallel.Reference('id')\n", "\n", "tic = time.time()\n", "ar = dv.apply(time.sleep, ref)\n", "for i,r in enumerate(ar):\n", " print \"%i: %.3f\"%(i, time.time()-tic)\n", " sys.stdout.flush()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "[0, 1, 2, 3]\n", "0: 0.030\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "1: 1.017\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "2: 2.028\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "3: 3.026\n" ] } ], "prompt_number": 14 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we submit a bunch of tasks of increasing magnitude, and\n", "watch where they happen, iterating through the results as they come." ] }, { "cell_type": "code", "collapsed": false, "input": [ "def sleep_here(t):\n", " \"\"\"sleep here for a time, return where it happened\"\"\"\n", " import time\n", " time.sleep(t)\n", " return id\n", "\n", "amr = lview.map(sleep_here, [.01*t for t in range(32)])\n", "tic = time.time()\n", "for i,r in enumerate(amr):\n", " print \"task %i on engine %i: %.3f\" % (i, r, time.time()-tic)\n", " sys.stdout.flush()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "task 0 on engine 2: 0.001\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 1 on engine 3: 0.001\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 2 on engine 1: 0.001\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 3 on engine 0: 0.002\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 4 on engine 2: 0.043\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 5 on engine 3: 0.043\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 6 on engine 1: 0.096\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 7 on engine 0: 0.108\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 8 on engine 2: 0.110\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 9 on engine 3: 0.179\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 10 on engine 1: 0.205\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 11 on engine 0: 0.207\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 12 on engine 2: 0.245\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 13 on engine 3: 0.307\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 14 on engine 1: 0.361\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 15 on engine 0: 0.370\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 16 on engine 2: 0.459\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 17 on engine 3: 0.488\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 18 on engine 1: 0.544\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 19 on engine 0: 0.582\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 20 on engine 2: 0.670\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 21 on engine 3: 0.719\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 22 on engine 1: 0.777\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 23 on engine 0: 0.829\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 24 on engine 2: 0.923\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 25 on engine 3: 0.990\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 26 on engine 1: 1.051\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 27 on engine 0: 1.107\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 28 on engine 2: 1.205\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 29 on engine 3: 1.298\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 30 on engine 1: 1.355\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 31 on engine 0: 1.441\n" ] } ], "prompt_number": 15 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unlike `DirectView.map()`, which always results in one task per engine,\n", "LoadBalance map defaults to one task per *item* in the sequence. This\n", "can be changed by specifying the `chunksize` keyword arg." ] }, { "cell_type": "code", "collapsed": true, "input": [ "amr = lview.map(sleep_here, [.01*t for t in range(32)], chunksize=4)\n", "tic = time.time()\n", "for i,r in enumerate(amr):\n", " print \"task %i on engine %i: %.3f\" % (i, r, time.time()-tic)\n", " sys.stdout.flush()" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "task 0 on engine 2: 0.087\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 1 on engine 2: 0.088\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 2 on engine 2: 0.088\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 3 on engine 2: 0.089\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 4 on engine 3: 0.255\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 5 on engine 3: 0.256\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 6 on engine 3: 0.256\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 7 on engine 3: 0.257\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 8 on engine 1: 0.425\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 9 on engine 1: 0.425\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 10 on engine 1: 0.426\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 11 on engine 1: 0.427\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 12 on engine 0: 0.576\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 13 on engine 0: 0.577\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 14 on engine 0: 0.577\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 15 on engine 0: 0.578\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 16 on engine 2: 0.798\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 17 on engine 2: 0.798\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 18 on engine 2: 0.799\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 19 on engine 2: 0.799\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 20 on engine 3: 1.124\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 21 on engine 3: 1.125\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 22 on engine 3: 1.125\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 23 on engine 3: 1.126\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 24 on engine 1: 1.452\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 25 on engine 1: 1.452\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 26 on engine 1: 1.453\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 27 on engine 1: 1.453\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 28 on engine 0: 1.764\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 29 on engine 0: 1.764\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 30 on engine 0: 1.765\n" ] }, { "output_type": "stream", "stream": "stdout", "text": [ "task 31 on engine 0: 1.765\n" ] } ], "prompt_number": 17 }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Example\n", "\n", "## Parallelize nested loops\n", "\n", "Often we want to run a function with a variety of combinations of arguments.\n", "A useful skill is the ability to express a nested loop in terms of a map." ] }, { "cell_type": "code", "collapsed": false, "input": [ "def area(w,h):\n", " return w*h\n", "\n", "\n", "widths = range(1,4)\n", "heights = range(6,10)\n", "\n", "areas = []\n", "for w in widths:\n", " for h in heights:\n", " areas.append(area(w,h))\n", "areas" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "pyout", "prompt_number": 15, "text": [ "[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]" ] } ], "prompt_number": 15 }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "
\n", "`itertools.product` and `zip` will be helpful.\n", "
\n", "" ] }, { "cell_type": "code", "collapsed": true, "input": [ "%loadpy soln/nestedloop.py" ], "language": "python", "metadata": {}, "outputs": [], "prompt_number": 17 }, { "cell_type": "markdown", "metadata": {}, "source": [ "Validate the result:" ] }, { "cell_type": "code", "collapsed": false, "input": [ "print areas\n", "print list(ar)\n", "areas == list(ar)" ], "language": "python", "metadata": {}, "outputs": [ { "output_type": "stream", "stream": "stdout", "text": [ "[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]\n", "[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]\n" ] }, { "output_type": "pyout", "prompt_number": 19, "text": [ "True" ] } ], "prompt_number": 19 }, { "cell_type": "heading", "level": 2, "metadata": {}, "source": [ "Further Information" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load-balancing provides you with lots of control that we don't have time to discuss\n", "\n", "* Full DAG task dependency (in time and/or destination)\n", "* Functional dependencies to confine tasks to engines with appropriate capabilities\n", "* Specify subsets of engines for each task\n", "* Scheduling timeouts\n", "* Retries of failed task\n", "* TaskScheduler.hwm for greedy assignment of tasks to hide network latency behind computation\n", "* and more!" ] } ], "metadata": {} } ] }